7.1 详解粘包和拆包
7.1.1 半包问题的实战案例
package com.crazymakercircle.netty.echoServer;//…public class NettyDumpSendClient {private int serverPort;private String serverIp;Bootstrap b = new Bootstrap();public NettyDumpSendClient(String ip, int port) {this.serverPort = port;this.serverIp = ip;}public void runClient() {//创建反应器线程组//省略启动客户端Bootstrap引导类配置和启动//阻塞,直到连接完成f.sync();Channel channel = f.channel();//发送大量的文字String content= "疯狂创客圈:高性能学习者社群!";byte[] bytes =content.getBytes(Charset.forName("utf-8"));for (int i = 0; i< 1000; i++) {//发送ByteBufByteBuf buffer = channel.alloc().buffer();buffer.writeBytes(bytes);channel.writeAndFlush(buffer);}//省略优雅关闭客户端}public static void main(String[] args) throws InterruptedException {int port = NettyDemoConfig.SOCKET_SERVER_PORT;String ip = NettyDemoConfig.SOCKET_SERVER_IP;new NettyDumpSendClient(ip, port).runClient();}}
7.1.2 什么是半包问题
- 粘包:接收端(Receiver)收到一个ByteBuf,包含了发送端(Sender)的多个ByteBuf,发送端的多个ByteBuf在接收端“粘”在了一起
- 半包:Receiver将Sender的一个ByteBuf“拆”开了收,收到多个破碎的包。换句话说,Receiver收到了Sender的一个ByteBuf的一小部分。
7.2 使用JSON协议通信
JSON(JavaScript Object Notation,JS对象简谱)是一种轻量级的数据交换格式。它是基于ECMAScript(欧洲计算机协会制定的JS规范)的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得JSON成为理想的数据交换语言。
JSON协议是一种文本协议,易于人阅读和编写,同时也易于机器解析和生成,并能有效地提升网络传输效率。
7.2.1 JSON的核心优势
XML是一种常用的文本协议,和JSON一样都使用结构化方法来标记数据。和XML相比,JSON作为数据包格式传输的时候具有更高的效率。这是因为JSON不像XML那样需要有严格的闭合标签,让有效数据量与总数据包比大大提升,从而在同等数据流量的情况下减少了网络的传输压力。
7.2.2 JSON序列化与反序列化开源库
Java处理JSON数据有三个比较流行的开源类库:阿里巴巴的FastJson、谷歌的Gson和开源社区的Jackson。
在实际开发中,目前主流的策略是Gson和FastJson结合使用。在POJO序列化成JSON字符串的应用场景下,使用谷歌的Gson库;在JSON字符串反序列化成POJO的应用场景下,使用阿里巴巴的FastJson库。
package com.crazymakercircle.util;//省略importpublic class JsonUtil {//谷歌GsonBuilder构造器static GsonBuilder gb = new GsonBuilder();static {//不需要html escapegb.disableHtmlEscaping();}//序列化:使用Gson将 POJO 转成字符串public static String pojoToJson(java.lang.Object obj) {String json = gb.create().toJson(obj);return json;}//反序列化:使用Fastjson将字符串转成 POJO对象public static <T> T jsonToPojo(String json, Class<T>tClass) {T t = JSONObject.parseObject(json, tClass);return t;}}
7.2.3 JSON序列化与反序列化的实战案例
首先定义一个POJO类,名称为JsonMsg,包含id和content两个属性,然后使用lombok开源库的@Data注解为属性加上getter()和setter()方法。POJO类的源码如下:
@Datapublic class JsonMsg {private int id;private String content;public String convertToJson() {return JsonUtil.pojoToJson(this);}public static JsonMsg parseFromJson(String json) {return JsonUtil.jsonToPojo(json, JsonMsg.class);}}
使用POJO类JsonMsg的序列化、反序列化的实战案例代码如下:
public class JsonMsgDemo {public JsonMsg buildMsg() {JsonMsg user = new JsonMsg();user.setId(1000);user.setContent("弯弯入我心,秋凉知我意!");return user;}@Testpublic void serAndDesr() {JsonMsg message = buildMsg();String json = message.convertToJson();Logger.info("json:=" + json);JsonMsg msg = JsonMsg.parseFromJson(json);Logger.info("id:=" + msg.getId());Logger.info("content:=" + msg.getContent());}}[main|JsonMsgDemo.serAndDesr] |> json:={"id":1000,"content":"弯弯入我心,秋凉知我意!"}[main|JsonMsgDemo.serAndDesr] |> id:=1000[main|JsonMsgDemo.serAndDesr] |> content:=弯弯入我心,秋凉知我意!
7.2.4 JSON传输的编码器和解码器
Head-Content数据包的解码过程
Head-Content数据包的编码过程
Netty内置LengthFieldPrepender编码器的作用是在数据包的前面加上内容的二进制字节数组的长度。这个编码器和LengthFieldBasedFrameDecoder解码器是天生的一对,常常配套使用。这组“天仙配”属于Netty所提供的一组非常重要的编码器和解码器,常常用于Head-Content数据包的传输。
//构造器一public LengthFieldPrepender(int lengthFieldLength) {this(lengthFieldLength, false);}//构造器二public LengthFieldPrepender(int lengthFieldLength, Boolean lengthIncludesLengthFieldLength){this(lengthFieldLength, 0, lengthIncludesLengthFieldLength);}//省略其他的构造器
在上面的构造器中,第一个参数lengthFieldLength表示Head长度字段所占用的字节数,第二个参数lengthIncludesLengthFieldLength表示Head字段的总长度值是否包含长度字段自身的字节数,如果该参数的值为true,表示长度字段的值(总长度)包含了自己的字节数。如果该参数的值为false,表示长度值只包含内容的二进制数据的长度。lengthIncludesLengthFieldLength值一般设置为false。
7.2.5 JSON传输的服务端的实战案例
服务端的程序仅仅读取客户端数据包并完成解码,服务端的程序没有写出任何输出数据包到对端(客户端)
package com.crazymakercircle.netty.protocol;//…public class JsonServer {//省略成员属性、构造器public void runServer() {//创建反应器线程组EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);EventLoopGroup workerLoopGroup = new NioEventLoopGroup();try {//省略引导类的反应器线程、设置配置项等//5 装配子通道流水线b.childHandler(new ChannelInitializer<SocketChannel>() {//有连接到达时会创建一个通道protected void initChannel(SocketChannel ch)…{//管理子通道中的Handler//向子通道流水线添加3个Handlerch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));ch.pipeline().addLast(newStringDecoder(CharsetUtil.UTF_8));ch.pipeline().addLast(new JsonMsgDecoder());}});//省略端口绑定、服务监听、优雅关闭}//服务端业务处理器static class JsonMsgDecoderextends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)…{String json = (String) msg;JsonMsg jsonMsg = JsonMsg.parseFromJson(json);Logger.info("收到一个 Json 数据包 =>>" + jsonMsg);}}public static void main(String[] args) throws InterruptedException {int port = NettyDemoConfig.SOCKET_SERVER_PORT;new JsonServer(port).runServer();}}
7.2.6 JSON传输的客户端的实战案例
- 通过谷歌的Gson框架,将POJO序列化成JSON字符串。
- 使用StringEncoder编码器(Netty内置)将JSON字符串编码成二进制字节数组。
使用LengthFieldPrepender编码器(Netty内置)将二进制字节数组编码成Head-Content格式的二进制数据包。 ```java package com.crazymakercircle.netty.protocol; //… public class JsonSendClient { static String content = “疯狂创客圈:高性能学习社群!”; //省略成员属性、构造器 public void runClient() {
//创建反应器线程组EventLoopGroup workerLoopGroup = new NioEventLoopGroup();try {//省略引导类的反应器线程、设置配置项等//5 装配通道流水线b.handler(new ChannelInitializer<SocketChannel>() {//初始化客户端通道protected void initChannel(SocketChannel ch) …{//客户端通道流水线添加2个Handlerch.pipeline().addLast(new LengthFieldPrepender(4));ch.pipeline().addLast(newStringEncoder(CharsetUtil.UTF_8));}});ChannelFuture f = b.connect();//…//阻塞,直到连接完成f.sync();Channel channel = f.channel();//发送 JSON 字符串对象for (int i = 0; i< 1000; i++) {JsonMsg user = build(i, i + "->" + content);
<a name="tMapu"></a>## 7.3 使用Protobuf协议通信Protobuf(Protocol Buffer)是Google提出的一种数据交换格式,是一套类似JSON或者XML的数据传输格式和规范,用于不同应用或进程之间的通信。Protobuf具有以下特点:1. 语言无关,平台无关1. 高效1. 扩展性、兼容性好<a name="Ui25f"></a>## 7.3.1 一个简单的proto文件的实战案例下面介绍一个非常简单的proto文件:仅仅定义一个消息结构体,并且该消息结构体也非常简单,仅包含两个字段。实例如下:```java//[开始头部声明]syntax = "proto3";package com.crazymakercircle.netty.protocol;//[结束头部声明]//[开始 Java选项配置]option java_package = "com.crazymakercircle.netty.protocol";option java_outer_classname = "MsgProtos";//[结束 Java选项配置]//[开始消息定义]message Msg {uint32 id = 1; //消息IDstring content = 2; //消息内容}//[结束消息定义]
7.3.2 通过控制台命令生成POJO和Builder
7.3.3 通过Maven插件生成POJO和Builder
<plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.5.0</version><extensions>true</extensions><configuration><!--proto文件路径--><protoSourceRoot>${project.basedir}/protobuf</protoSourceRoot><!--目标路径--><outputDirectory>${project.build.sourceDirectory}</outputDirectory><!--设置是否在生成java文件之前清空outputDirectory的文件--><clearOutputDirectory>false</clearOutputDirectory><!--临时目录--><temporaryProtoFileDirectory>${project.build.directory}/protoc-temp</temporaryProtoFileDirectory><!--protoc 可执行文件路径--><protocExecutable>${project.basedir}/protobuf/protoc3.6.1.exe</protocExecutable></configuration><executions><execution><goals><goal>compile</goal><goal>test-compile</goal></goals></execution></executions></plugin>
7.3.4 Protobuf序列化与反序列化的实战案例
在Maven的pom.xml文件中加上protobuf的Java运行包的依赖,代码如下
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version></dependency>
- 使用Builder构造POJO消息对象
```java
package com.crazymakercircle.netty.protocol;
//…
public class ProtobufDemo {
public static MsgProtos.Msg buildMsg() {
} //… }MsgProtos.Msg.Builder personBuilder = MsgProtos.Msg.newBuilder();personBuilder.setId(1000);personBuilder.setContent("疯狂创客圈:高性能学习社群");MsgProtos.Msg message = personBuilder.build();return message;
2. 序列化与反序列化的方式一方式一为调用Protobuf POJO对象的toByteArray()方法将POJO对象序列化成字节数组,具体的代码如下:```javapackage com.crazymakercircle.netty.protocol;//…public class ProtobufDemo {//第1种方式:序列化与反序列化@Testpublic void serAndDesr1() throws IOException {MsgProtos.Msg message = buildMsg();//将Protobuf对象序列化成二进制字节数组byte[] data = message.toByteArray();//可以用于网络传输,保存到内存或外存ByteArrayOutputStream outputStream = new ByteArrayOutputStream();outputStream.write(data);data = outputStream.toByteArray();//二进制字节数组反序列化成Protobuf对象MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(data);Logger.info("id:=" + inMsg.getId());Logger.info("content:=" + inMsg.getContent());}//…}
序列化与反序列化的方式二 ```java package com.crazymakercircle.netty.protocol; //… public class ProtobufDemo {
//…
//第2种方式:序列化与反序列化 @Test public void serAndDesr2() throws IOException {
MsgProtos.Msg message = buildMsg();//序列化到二进制码流ByteArrayOutputStream outputStream = new ByteArrayOutputStream();message.writeTo(outputStream);ByteArrayInputStream inputStream =new ByteArrayInputStream(outputStream.toByteArray());//从二进码流反序列化成Protobuf对象MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(inputStream);Logger.info("id:=" + inMsg.getId());Logger.info("content:=" + inMsg.getContent());
} }
4. 序列化与反序列化的方式三```javapackage com.crazymakercircle.netty.protocol;//…public class ProtobufDemo {//…//第3种方式:序列化与反序列化//带字节长度:[字节长度][字节数据],用于解决粘包/半包问题@Testpublic void serAndDesr3() throws IOException {MsgProtos.Msg message = buildMsg();//序列化到二进制码流ByteArrayOutputStream outputStream =new ByteArrayOutputStream();message.writeDelimitedTo(outputStream);ByteArrayInputStream inputStream =new ByteArrayInputStream(outputStream.toByteArray());//从二进制码字节流反序列化成Protobuf对象MsgProtos.Msg inMsg =MsgProtos.Msg.parseDelimitedFrom(inputStream);Logger.info("id:=" + inMsg.getId());Logger.info("content:=" + inMsg.getContent());}}
反序列化时,调用Protobuf生成的POJO类的parseDelimitedFrom(InputStream)静态方法,从输入流中先读取varint32类型的长度值,然后根据长度值读取此消息的二进制字节,再反序列化得到POJO新的实例。
7.4 Protobuf编解码的实战案例
Netty默认支持Protobuf的编码与解码,内置了一套基础的Protobuf编码和解码器。
7.4.1 Netty内置的Protobuf基础编码器/解码器
- ProtobufEncoder编码器
直接调用了Protobuf POJO实例的toByteArray()方法将自身编码成二进制字节,然后放入Netty的ByteBuf缓冲区中,接着会被发送到下一站编码器。
package io.netty.handler.codec.protobuf;…@Sharablepublic class ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {if (msg instanceof MessageLite) {out.add(Unpooled.wrappedBuffer( ((MessageLite) msg).toByteArray()));return;}if (msg instanceof MessageLite.Builder) {out.add(Unpooled.wrappedBuffer(( (MessageLite.Builder) msg).build().toByteArray()));}}}
- ProtobufDecoder解码器
ProtobufDecoder和ProtobufEncoder相互对应,只不过在使用的时候ProtobufDecoder解码器需要指定一个Protobuf POJO实例作为解码的参考原型(prototype),解码时会根据原型实例找到对应的Parser解析器,将二进制的字节解码为Protobuf POJO实例。
- ProtobufVarint32LengthFieldPrepender长度编码器
这个编码器的作用是在ProtobufEncoder生成的字节数组之前前置一个varint32数字,表示序列化的二进制字节数量或者长度。
ProtobufVarint32FrameDecoder长度解码器
7.4.2 Protobuf传输的服务端的实战案例
服务端的实战案例程序代码如下
package com.crazymakercircle.netty.protocol;//…public class ProtoBufServer{//省略成员属性、构造器public void runServer(){//创建反应器线程组EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);EventLoopGroup workerLoopGroup = new NioEventLoopGroup();try{//省略引导类的反应器线程、设置配置项//5 装配子通道流水线b.childHandler(new ChannelInitializer<SocketChannel>(){//有连接到达时会创建一个通道protected void initChannel(SocketChannel ch) …{//流水线管理子通道中的Handler业务处理器//向子通道流水线添加3个Handler业务处理器ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());ch.pipeline().addLast(new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance()));ch.pipeline().addLast(new ProtobufBussinessDecoder());}});//省略端口绑定、服务监听、优雅关闭}//服务端的Protobuf业务处理器static class ProtobufBussinessDecoderextends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) … {MsgProtos.Msg protoMsg = (MsgProtos.Msg) msg;//经过流水线的各个解码器取得了POJO实例Logger.info("收到一个ProtobufPOJO =>>");Logger.info("protoMsg.getId():=" + protoMsg.getId());Logger.info("protoMsg.getContent():=" +protoMsg.getContent());}}}public static void main(String[] args) throws InterruptedException{int port = NettyDemoConfig.SOCKET_SERVER_PORT;new ProtoBufServer(port).runServer();}}
7.4.3 Protobuf传输的客户端的实战案例
使用Netty内置的ProtobufEncoder将Protobuf POJO对象编码成二进制的字节数组。
- 使用Netty内置的ProtobufVarint32LengthFieldPrepender编码器,加上varint32格式的可变长度。Netty会将完成了编码后的Length+Content格式的二进制字节码发送到服务端。

package com.crazymakercircle.netty.protocol;//…public class ProtoBufSendClient {static String content = "疯狂创客圈:高性能学习社群!";//省略成员属性、构造器public void runClient() {//创建反应器线程组EventLoopGroup workerLoopGroup = new NioEventLoopGroup();try {//省略反应器组、IO通道、通道参数等设置//5 装配通道流水线b.handler(new ChannelInitializer<SocketChannel>() {//初始化客户端通道protected void initChannel(SocketChannel ch) …{//客户端流水线添加2个Handler业务处理器ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());ch.pipeline().addLast(new ProtobufEncoder());}});ChannelFuture f = b.connect();//…//阻塞,直到连接完成f.sync();Channel channel = f.channel();//发送Protobuf对象for (int i = 0; i< 1000; i++) {MsgProtos.Msg user = build(i, i + "->" + content);channel.writeAndFlush(user);Logger.info("发送报文数:" + i);}channel.flush();//省略关闭等待、优雅关闭}//构建ProtoBuf对象public MsgProtos.Msgbuild(int id, String content) {MsgProtos.Msg.Builder builder = MsgProtos.Msg.newBuilder();builder.setId(id);builder.setContent(content);return builder.build();}public static void main(String[] args) throws InterruptedException {int port = NettyDemoConfig.SOCKET_SERVER_PORT;String ip = NettyDemoConfig.SOCKET_SERVER_IP;new ProtoBufSendClient(ip, port).runClient();}}
7.5 详解Protobuf协议语法
在Protobuf中,通信协议的格式是通过proto文件定义的。一个proto文件有两大组成部分:头部声明、消息结构体的定义。头部声明部分主要包含了协议的版本、包名、特定语言的选项设置等;消息结构体部分可以定义一个或者多个消息结构体。
7.5.1 proto文件的头部声明
//[开始声明]syntax = "proto3";//定义Protobuf的包名称空间package com.crazymakercircle.netty.protocol;//[结束声明]//[开始 Java 选项配置]option java_package = "com.crazymakercircle.netty.protocol";option java_outer_classname = "MsgProtos";//[结束 Java 选项配置]
- syntax版本号
package包
和java类似,通过package指定包名,用来避免消息名字相冲突,如果两个消息名称相同,但package包名不同,那么可以共存 在java中,会以package指定的包名作为生成的pojo类的包名
option配置选项
与proto文件使用的一些特定语言场景有关,在java中以
java_开头的option选项会生效 选项option java_package表示protobuf编译器在生成java pojo消息类时,生成在此选项所配置的java包名下,如果没有该选项,则会以头部声明的package作为java包名 选项option java_multiple_file表示生成java类时的打包方式- 一个消息对应一个独立的java类
- 所有的消息都作为内部类,打包到一个外部类中(默认)
选项
option java_outer_classname
7.5.2 Protobuf的消息结构体与消息字段
//[开始消息定义]message Msg {uint32 id = 1; //消息IDstring content = 2; //消息内容}//[结束消息定义]
