博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty中级篇(2)
阅读量:6836 次
发布时间:2019-06-26

本文共 29117 字,大约阅读时间需要 97 分钟。

 

上一篇 

一、编码解码技术

如何评价一个编解码技术:

  • 是否支持跨语言,或者说支持的语言是否丰富
  • 编码码流大小,影响传输速度
  • 编码和解码的性能,即时间
  • 类库是否精致,API是否方便
  • 使用难度

1. Java序列化缺点

Java也提供了序列化技术,在工业化工程中有以下缺点:

  • 无法跨语言
  • 序列化后的码流太大
  • 序列化的性能太差

下面我们来测试以下jdk序列化的问题

创建一个测试类UserInfo:

1 import java.io.Serializable; 2 import java.nio.ByteBuffer; 3  4 /** 5  * @author Administrator 6  * @version 1.0 7  * @date 2014年2月23日 8  */ 9 public class UserInfo implements Serializable {10 11     /**12      * 默认的序列号13      */14     private static final long serialVersionUID = 1L;15 16     private String userName;17 18     private int userID;19 20     public UserInfo buildUserName(String userName) {21         this.userName = userName;22         return this;23     }24 25     public UserInfo buildUserID(int userID) {26         this.userID = userID;27         return this;28     }29 30     /**31      * @return the userName32      */33     public final String getUserName() {34         return userName;35     }36 37     /**38      * @param userName the userName to set39      */40     public final void setUserName(String userName) {41         this.userName = userName;42     }43 44     /**45      * @return the userID46      */47     public final int getUserID() {48         return userID;49     }50 51     /**52      * @param userID the userID to set53      */54     public final void setUserID(int userID) {55         this.userID = userID;56     }57 58     /**59      * 将当前对象转换一个byte[]数组60      * @return61      */62     public byte[] codeC() {63         ByteBuffer buffer = ByteBuffer.allocate(1024);64         //写入userName长度和内容65         byte[] value = this.userName.getBytes();66         buffer.putInt(value.length);67         buffer.put(value);68         //直接写入Id69         buffer.putInt(this.userID);70         buffer.flip();71         value = null;72         byte[] result = new byte[buffer.remaining()];73         buffer.get(result);74         return result;75     }76 77     public byte[] codeC(ByteBuffer buffer) {78         buffer.clear();79         byte[] value = this.userName.getBytes();80         buffer.putInt(value.length);81         buffer.put(value);82         buffer.putInt(this.userID);83         buffer.flip();84         value = null;85         byte[] result = new byte[buffer.remaining()];86         buffer.get(result);87         return result;88     }89 }

其中的codeC是最朴素的编码方法,我们来和它比较以下

比较大小:

import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectOutputStream;/** * @author Administrator * @version 1.0 * @date 2014年2月23日 */public class TestUserInfo {    /**     * @param args     * @throws IOException     */    public static void main(String[] args) throws IOException {        UserInfo info = new UserInfo();        info.buildUserID(100).buildUserName("Welcome to Netty");        ByteArrayOutputStream bos = new ByteArrayOutputStream();        ObjectOutputStream os = new ObjectOutputStream(bos);        os.writeObject(info);        os.flush();        os.close();        byte[] b = bos.toByteArray();        System.out.println("The jdk serializable length is : " + b.length);        bos.close();        System.out.println("-------------------------------------");        System.out.println("The byte array serializable length is : "                + info.codeC().length);    }}

结果有点不能接受,这么一点就大了6倍

"C:\Program Files (x86)\Java\jdk1.8.0_102\bin\java" -Didea.launcher.port=7537 "-Didea.launcher.bin.path=C:\dev\JetBrains\IntelliJ IDEA 2016.2.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\charsets.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\deploy.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\access-bridge-32.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\cldrdata.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\dnsns.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\jaccess.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\jfxrt.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\localedata.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\nashorn.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunec.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunjce_provider.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunmscapi.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunpkcs11.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\zipfs.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\javaws.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jce.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jfr.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jfxswt.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jsse.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\management-agent.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\plugin.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\resources.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\rt.jar;G:\projects-helloworld\netty\target\classes;G:\repo\maven\io\netty\netty-all\4.1.5.Final\netty-all-4.1.5.Final.jar;C:\dev\JetBrains\IntelliJ IDEA 2016.2.1\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain demo.codec.serializable.TestUserInfoThe jdk serializable length is : 117-------------------------------------The byte array serializable length is : 24

比较下时间

import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectOutputStream;import java.nio.ByteBuffer;/** * @author Administrator * @version 1.0 * @date 2014年2月23日 */public class PerformTestUserInfo {    /**     * @param args     * @throws IOException     */    public static void main(String[] args) throws IOException {        UserInfo info = new UserInfo();        info.buildUserID(100).buildUserName("Welcome to Netty");        int loop = 1000000;        ByteArrayOutputStream bos = null;        ObjectOutputStream os = null;        long startTime = System.currentTimeMillis();        for (int i = 0; i < loop; i++) {            bos = new ByteArrayOutputStream();            os = new ObjectOutputStream(bos);            os.writeObject(info);            os.flush();            os.close();            byte[] b = bos.toByteArray();            bos.close();        }        long endTime = System.currentTimeMillis();        System.out.println("The jdk serializable cost time is  : "                + (endTime - startTime) + " ms");        System.out.println("-------------------------------------");        ByteBuffer buffer = ByteBuffer.allocate(1024);        startTime = System.currentTimeMillis();        for (int i = 0; i < loop; i++) {            byte[] b = info.codeC(buffer);        }        endTime = System.currentTimeMillis();        System.out.println("The byte array serializable cost time is : "                + (endTime - startTime) + " ms");    }}

运行结果,jdk的慢了10倍都不止

The jdk serializable cost time is  : 1928 ms-------------------------------------The byte array serializable cost time is : 164 ms

2. 主流的编解码框架简介

  • Google的Protobuf
  • Facebook的Thrift
  • JBoss Marshalling

这里主要介绍这3种,还有其他著名比如Hryo等等...

Google ProtoBuf

google内部久经考验。它将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关方法和属性。

特点:

  •   结构化数据存储格式
  •   性能高效
  •   语言无关、平台无关、扩展性
  •   官方支持Java、C++和Python三种语言

(1) ProtoBuf使用二进制编码,而不是XML,尽管XML的可读性和扩展性都不错,但是XML牺牲的空间和时间开销太大,不适合高性能框架

(2) ProtoBuf另一个吸引人的地方是数据描述文件和代码生成机制

下面的图很有说服力,为什么这么多人选择Google的Protobuf

性能对比:

 

 码流对比:

 

 Facebook的Thrift

对当时的Facebook而言,thrift用于解决各系统间大数量的传输通信问题,因此可以多种语言,C++ C# Cocoa Erlang Haskell Java Perl PHP Python Ruby和Smalltalk

  • Thrift可以作为高性能的通信中间件,支持数据序列化的多种类型的RPC服务。
  • 适用于静态数据交换,即事先确定好它的数据结构,当数据结构变化时,必须重新编辑IDL文件,生成代码和编译。
  • 相对于XML和Json在性能和传输大小上有明显优势。

Thrift主要由5部分组成:

(1) 语言系统和IDL编译器:负责由用户给定的IDL文件生成相应语言接口代码;

(2) TProtocol: RPC协议层,可以选择多种不同的序列化方式,例如Binary和Json;

(3) TTransport:RPC传输层,同样可以选择不同的传输层实现,例如socket NIO和MemoryBuffer等;

(4) Tprocessor: 作为协议层和用户提供的服务实现的纽带,负责调用服务实现的接口;

(5) TServer:聚合TProtocol、TTransport和TProcessor等对象。

关注协议的话就是关于于Tprotocol层,其支持3中典型的编解码方式:

  • 通用二进制
  • 压缩二进制
  • 优化可选字段的压缩编解码

下图展示同等测试条件下的编解码耗时信息:

 

JBoss Marshalling

JBoss内部使用,不能跨语言,可以看做是jdk的进化版... 拥有优点如下:

  • 可插拔的类解析器、更加便捷的类加载定制策略,通过一个接口实现定制;
  • 可插拔的对象替换方式,不需要继续的方式;
  • 可插拔的预定义类缓存表,可以减小序列化的字节数组长度,提升常用类型的序列化对象性能;
  • 无须实现java.io.Serializable接口,实现序列化;
  • 利用了缓存技术提升性能

二、MessagePack编解码技术

2.1 介绍

高效、性能、跨语言、码流小、支持的语言由Java Python Ruby Hashkell C# OCaml Lua Go C C++等。

pom文件,guava是额外可以不用.

org.msgpack
msgpack
0.6.11
com.google.guava
guava
20.0

Java API

import com.google.common.collect.Lists;import org.msgpack.MessagePack;import org.msgpack.template.Templates;import java.util.List;/** * Created by carl.yu on 2016/12/15. */public class ApiDemo {    public static void main(String[] args) throws Exception {        //使用了guava        List
src = Lists.newArrayList("msgpack", "kumofs", "viver"); MessagePack msgpack = new MessagePack(); //序列化 byte[] raw = msgpack.write(src); //反序列化 List
dst1 = msgpack.read(raw, Templates.tList(Templates.TString)); System.out.println(dst1); }}

2.2 编写Encoder和Decoder

注意,要使用Messagepack,需要在实体类前加上注解@Message.

import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import org.msgpack.MessagePack;import java.util.List;/** * Created by carl.yu on 2016/12/15. */public class MsgpackDecoder extends MessageToMessageDecoder
{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List
out) throws Exception { //将msg中的字节写到array中 System.out.println("开始进行解码..."); final byte[] array; final int length = msg.readableBytes(); array = new byte[length]; msg.getBytes(msg.readerIndex(), array, 0, length); MessagePack msgpack = new MessagePack(); Object result = msgpack.read(array); out.add(result); }}
import com.google.common.base.Throwables;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import org.msgpack.MessagePack;/** * Created by carl.yu on 2016/12/15. */public class MsgpackEncoder extends MessageToByteEncoder {    @Override    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {        //负责将POJO对象编码为byte数组        MessagePack msgpack = new MessagePack();        byte[] raw = null;        try {            raw = msgpack.write(msg);        } catch (Exception e) {            e.printStackTrace();            Throwables.propagateIfPossible(e);        }        out.writeBytes(raw);    }}

分别用MessagePack进行编解码

2.3 编写Server和ServerHandler

1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;10 import io.netty.handler.codec.LengthFieldPrepender;11 import io.netty.handler.logging.LogLevel;12 import io.netty.handler.logging.LoggingHandler;13 14 /**15  * Created by carl.yu on 2016/12/15.16  */17 public class EchoServer {18     public void bind(int port) throws Exception {19         // 配置服务端的NIO线程组20         EventLoopGroup bossGroup = new NioEventLoopGroup();21         EventLoopGroup workerGroup = new NioEventLoopGroup();22         try {23             ServerBootstrap b = new ServerBootstrap();24             b.group(bossGroup, workerGroup)25                     .channel(NioServerSocketChannel.class)26                     .option(ChannelOption.SO_BACKLOG, 100)27                     .handler(new LoggingHandler(LogLevel.INFO))28                     .childHandler(new ChannelInitializer
() {29 @Override30 public void initChannel(SocketChannel ch)31 throws Exception {32 //读数据的时候用decoder解码33 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));34 ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());35 //写数据的时候用encoder编码36 ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));37 ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());38 //39 ch.pipeline().addLast(new EchoServerHandler());40 }41 });42 43 // 绑定端口,同步等待成功44 ChannelFuture f = b.bind(port).sync();45 46 // 等待服务端监听端口关闭47 f.channel().closeFuture().sync();48 } finally {49 // 优雅退出,释放线程池资源50 bossGroup.shutdownGracefully();51 workerGroup.shutdownGracefully();52 }53 }54 55 public static void main(String[] args) throws Exception {56 int port = 8080;57 if (args != null && args.length > 0) {58 try {59 port = Integer.valueOf(args[0]);60 } catch (NumberFormatException e) {61 // 采用默认值62 }63 }64 new EchoServer().bind(port);65 }66 }

主要在于2个编解码器。

在MessagePack编码器之前增加了LengthFieldPrepender,它将在ByteBuf之前增加字节的消息长度。

  然后使用LengthFieldBasedFrameDecoder根据消息长度进行解码,工作原理如图:

这样获取到的永远是整包消息,非常简单的解决了烦人的半包问题

2.4 编写Client和ClientHandler

import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;/** * Created by carl.yu on 2016/12/15. */public class EchoClient {    public void connect(int port, String host) throws Exception {        // 配置客户端NIO线程组        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group).channel(NioSocketChannel.class)                    .option(ChannelOption.TCP_NODELAY, true)                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)                    .handler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { //读数据的时候用decoder解码 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder()); //写数据的时候用encoder编码 ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder()); ch.pipeline().addLast(new EchoClientHandler(100)); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new EchoClient().connect(port, "127.0.0.1"); }}
import demo.codec.serializable.UserInfo;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * Created by carl.yu on 2016/12/15. */public class EchoClientHandler extends ChannelInboundHandlerAdapter {    private final int sendNumber;    public EchoClientHandler(int sendNumber) {        this.sendNumber = sendNumber;    }    private UserInfo[] userInfo() {        UserInfo[] userInfos = new UserInfo[sendNumber];        UserInfo userInfo = null;        for (int i = 0; i < sendNumber; i++) {            userInfo = new UserInfo();            userInfos[i] = userInfo;            userInfo.setUserID(i);            userInfo.setUserName("ABDCEFG-->" + i);        }        return userInfos;    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {       /* UserInfo userInfo = new UserInfo();        userInfo.setUserID(0);        userInfo.setUserName("ABDCEFG-->" + 0);*/        UserInfo[] userInfos = userInfo();        for (int i = 0; i < userInfos.length; i++) {            ctx.writeAndFlush(userInfos[i]);        }    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("客户端收到信息:" + msg);//        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//        ctx.flush();//        ctx.close();    }}

后面我们会更加详细的讲解LengthFieldPrepender和LengthFieldBasedFrameDecoder,这里只需要明白用来解决半包问题即可。

三、Google Protobuf

3.1 测试Google Protobuf

准备环境:

SubscribeReq.proto:

package netty;option java_package="demo.codec.protobuf";option java_outer_classname="SubscribeReqProto";message SubscribeReq{    required int32 subReqID = 1;    required string userName = 2;    required string productName = 3;    repeated string address = 4;}

SubscribeResp.proto

package netty;option java_package="demo.codec.protobuf";option java_outer_classname="SubscribeRespProto";message SubscribeResp{    required int32 subReqID = 1;    required int32 respCode = 2;    required string desc = 3;}

这里不详细介绍google protobuf的语法:

build.bat

protoc ./proto/*.proto --java_out=../main/javapause

google protobuf依赖maven:

com.google.protobuf
protobuf-java
2.5.0

运行build.bat,生成:

下面我们运行以下代码来了解Protobuf的用法:

import com.google.protobuf.InvalidProtocolBufferException;import java.util.ArrayList;import java.util.List;/** * @author Administrator * @version 1.0 * @date 2014年2月23日 */public class TestSubscribeReqProto {    // 编码方法: Object->byte[]    private static byte[] encode(SubscribeReqProto.SubscribeReq req) {        return req.toByteArray();    }    // 解码方法: bayte[] -> Object    private static SubscribeReqProto.SubscribeReq decode(byte[] body)            throws InvalidProtocolBufferException {        return SubscribeReqProto.SubscribeReq.parseFrom(body);    }    /**     * 创建实例     *     * @return     */    private static SubscribeReqProto.SubscribeReq createSubscribeReq() {        //(1) Builder模式        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq                .newBuilder();        builder.setSubReqID(1);        builder.setUserName("Lilinfeng");        builder.setProductName("Netty Book");        List
address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } /** * @param args * @throws InvalidProtocolBufferException */ public static void main(String[] args) throws InvalidProtocolBufferException { SubscribeReqProto.SubscribeReq req = createSubscribeReq(); System.out.println("Before encode : " + req.toString()); SubscribeReqProto.SubscribeReq req2 = decode(encode(req)); System.out.println("After decode : " + req.toString()); System.out.println("Assert equal : --> " + req2.equals(req)); }}

3.2 开发图书订购服务端

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class SubReqServer {    public void bind(int port) throws Exception {        // 配置服务端的NIO线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast( new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder( SubscribeReqProto.SubscribeReq .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new SubReqServer().bind(port); }}

我们来注意以下编解码器的顺序:

(1) ProtobufVarint32FrameDecoder : 半包问题

(2) ProtobufDecoder:解码

(3) ProtobufVarint32LenghtFiedldPrepender:半包问题

(4) ProtobufEncoder:编码

于是逻辑处理部分可以直接使用类:

1 import io.netty.channel.ChannelHandler.Sharable; 2 import io.netty.channel.ChannelHandlerAdapter; 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5  6 /** 7  * @author lilinfeng 8  * @version 1.0 9  * @date 2014年2月14日10  */11 @Sharable12 public class SubReqServerHandler extends ChannelInboundHandlerAdapter {13 14     @Override15     public void channelRead(ChannelHandlerContext ctx, Object msg)16             throws Exception {17         SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;18         if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {19             System.out.println("Service accept client subscribe req : ["20                     + req.toString() + "]");21             ctx.writeAndFlush(resp(req.getSubReqID()));22         }23     }24 25     private SubscribeRespProto.SubscribeResp resp(int subReqID) {26         SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp27                 .newBuilder();28         builder.setSubReqID(subReqID);29         builder.setRespCode(0);30         builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");31         return builder.build();32     }33 34     @Override35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {36         cause.printStackTrace();37         ctx.close();// 发生异常,关闭链路38     }39 }

3.3 图书订购客户端开发

import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class SubReqClient {    public void connect(int port, String host) throws Exception {        // 配置客户端NIO线程组        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group).channel(NioSocketChannel.class)                    .option(ChannelOption.TCP_NODELAY, true)                    .handler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast( new ProtobufDecoder( SubscribeRespProto.SubscribeResp .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new SubReqClient().connect(port, "127.0.0.1"); }}
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.ArrayList;import java.util.List;/** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */public class SubReqClientHandler extends ChannelInboundHandlerAdapter {    /**     * Creates a client-side handler.     */    public SubReqClientHandler() {    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        for (int i = 0; i < 10; i++) {            ctx.write(subReq(i));        }        ctx.flush();    }    private SubscribeReqProto.SubscribeReq subReq(int i) {        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq                .newBuilder();        builder.setSubReqID(i);        builder.setUserName("Lilinfeng");        builder.setProductName("Netty Book For Protobuf");        List
address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive server response : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}

在不怎么了解Protobuf实现和使用细节的情况 下,我们就可以轻松支持Google Protobuf编码

3.4 注意事项

ProtobufDecoder仅仅负责解码,因此在ProtobufDecoder前面,一定要能够处理半包的解码器,有以下3种方式:

(1) 使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包消息;

(2) 继承Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;

(3) 继承ByteToMessageDecoder,自己处理..

半包问题必须解决,否则服务器无法正常工作。

四、JBoss Marshalling编解码

暂时略。可以参考。

 

转载于:https://www.cnblogs.com/carl10086/p/6183687.html

你可能感兴趣的文章
接口和抽象类对比
查看>>
memcached 常用命令及使用说明
查看>>
双链表
查看>>
.NET基础——数组
查看>>
解决 android 高低版本 webView 里内容 自适应屏幕的终极方法
查看>>
调用微信截图功能c# 截图带扩展名
查看>>
AC日记——830A - Office Keys
查看>>
实用js
查看>>
Linux的快速入门
查看>>
利用 Dolby® Digital Plus 提供优质音频体验
查看>>
【转载】27.SpringBoot和SpringMVC的区别
查看>>
Spring Mvc 实例
查看>>
MySQL深入理解
查看>>
三步快速解决dll冲突问题
查看>>
vue
查看>>
[JSOI2007]文本生成器
查看>>
Python基础算法综合:加减乘除四则运算方法
查看>>
《一面》
查看>>
sed命令详解
查看>>
【cl】找不到火狐Cannot find firefox binary in PATH
查看>>