8.1 自定义Protobuf编解码器
使用Netty内置的Protobuf系列编解码器,虽然可以解决简单的Protobuf协议的传输问题,但是对复杂Head-Content协议(例如数据包头部存在魔数、版本号字段,具体如图8-1所示)的解析,内置Protobuf系列编解码器就显得无能为力了,这种情况下需要自定义Protobuf编码器和解码器。
8.1.1 自定义Protobuf编码器
- 写入待发送的Protobuf POJO实例的二进制字节长度。
- 写入其他的字段,如魔数、版本号
写入Protobuf POJO实例的二进制字节码内容。
@Slf4jpublic class ProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, ProtoMsg.Message message, ByteBuf byteBuf) throws Exception {encode0(message, byteBuf);}public static void encode0(ProtoMsg.Message msg, ByteBuf out) {out.writeShort(ProtoInstant.MAGIC_CODE);out.writeShort(ProtoInstant.VERSION_CODE);byte[] bytes = msg.toByteArray(); // 将ProtoMsg.Message对象转换为byteint length = bytes.length; // 读取消息长度Logger.cfo("encoder length = " + length);// 先将消息长度写入out.writeInt(length);// 消息体中包含我们要发送的数据out.writeBytes(bytes);}}
8.1.2 自定义Protobuf解码器
读取长度,如果长度位数不够,就终止读取。
- 读取魔数、版本号等其他字段。
按照净长度读取内容。如果内容的字节数不够,则恢复到之前的起始位置(也就是长度的位置),然后终止读取。
public class ProtobufDecoder extends ByteToMessageDecoder{@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {Object outMsg = decode0(channelHandlerContext, byteBuf);if (outMsg != null) {// 获取业务消息list.add(outMsg);}}public static Object decode0(ChannelHandlerContext ctx, ByteBuf in) throws InvalidFrameException, InvalidProtocolBufferException {//标记下当前readIndex的位置in.markReaderIndex();// 判断包头的长度if (in.readableBytes() < 8) {return null;}//读取魔数short magic = in.readShort();if (magic != ProtoInstant.MAGIC_CODE) {String error = "客户端口令不对: " + ctx.channel().remoteAddress(); // 可以从ctx中得到channel,然后从channel中移出remoteAddress//异常连接,直接报错,关闭连接throw new InvalidFrameException(error);}// 读取版本short version = in.readShort();if (version != ProtoInstant.VERSION_CODE){String error = "协议的版本不对:" + ctx.channel().remoteAddress();throw new InvalidFrameException(error);}// 读取传送过来的消息长度int length = in.readInt();// 长度如果小于0if (length < 0) {// 非法数据,关闭连接ctx.close();}if (length > in.readableBytes()){ // 读到的消息体长度如果小于传送过来的消息长度// 重置读取位置in.resetReaderIndex();return null;}Logger.cfo("decoder length = " + in.readableBytes());byte[] array;if (in.hasArray()) { // 说明是堆缓冲// array = new byte[length];// in.readBytes(array, 0, length);ByteBuf slice = in.slice(in.readerIndex(), length);Logger.cfo("slice length=" + slice.readableBytes());array = slice.array();}else {//直接缓冲array = new byte[length];in.readBytes(array, 0, length);}//字节转对象ProtoMsg.Message outMsg = ProtoMsg.Message.parseFrom(array);return outMsg;}}
8.1.3 IM系统中Protobuf消息格式的设计
原则一:消息类型使用enum定义 ```java enum HeadType {
LOGIN_REQUEST = 0; //登录请求LOGIN_RESPONSE = 1; //登录响应LOGOUT_REQUEST = 2; //登出请求LOGOUT_RESPONSE = 3; //登出响应KEEPALIVE_REQUEST = 4; //心跳请求KEEPALIVE_RESPONSE = 5; //心跳响应MESSAGE_REQUEST = 6; //聊天消息请求MESSAGE_RESPONSE = 7; //聊天消息响应MESSAGE_NOTIFICATION = 8; //服务器通知
}
2. 原则二:使用一个Protobuf消息结构定义一类消息```java/*登录请求信息*/message LoginRequest {string uid = 1; //用户唯一IDstring deviceId = 2; //设备IDstring token = 3; //用户tokenuint32 platform = 4; //客户端平台 Windows、MAC、Android、IOS、Webstring appVersion = 5; //APP版本号}
- 原则三:建议给应答消息加上成功标记和应答序号 ```java /聊天响应/ message MessageResponse { bool result = 1; //true表示发送成功,false表示发送失败 uint32 code = 2; //错误码 string info = 3; //错误描述 uint32 expose = 4; //错误描述是否提示给用户:1 提示; 0 不提示 bool lastBlock = 5; //是否为最后的应答 fixed32 blockIndex = 6; //应答的序号 }
4. 原则四:编解码从顶层消息开始```java/*外层消息*/message Message {HeadType type = 1; //消息类型uint64 sequence = 2; //序列号string sessionId = 3; //会话IDLoginRequest loginRequest = 4; //登录请求LoginResponse loginResponse = 5; //登录响应MessageRequest messageRequest = 6; //聊天请求MessageResponse messageResponse = 7; //聊天响应MessageNotification notification = 8; //通知消息}
8.2 IM的登录流程
8.2.1 图解登录/响应流程的环节

从客户端到服务端再到客户端,9个环节的相关介绍如下:
- 客户端收集用户ID和密码,需要使用LoginConsoleCommand控制台命令类。
- 客户端发送Protobuf数据包到客户端通道,需要通过LoginSender发送器组装Protobuf数据包。
- 客户端通道将Protobuf数据包发送到对端,需要通过Netty底层来完成。
- 服务器子通道收到Protobuf数据包,需要通过Netty底层来完成。
- 服务端UserLoginHandler入站处理器收到登录消息,交给业务处理器LoginMsgProcesser处理异步的业务逻辑。
- 服务端LoginMsgProcesser处理完异步的业务逻辑,将处理结果写入用户绑定的子通道。
- 服务器子通道将登录响应Protobuf数据帧发送到客户端,需要通过Netty底层来完成。
- 客户端通道收到Protobuf登录响应数据包,需要通过Netty底层来完成。
客户端LoginResponseHandler业务处理器处理登录响应,例如设置登录的状态、保存会话的Session ID等。
8.2.2 客户端涉及的主要模块
ClientCommand模块:控制台命令收集器。
- ProtobufBuilder模块:Protobuf数据包构造者。
- Sender模块:数据包发送器。
-
8.2.3 服务端涉及的主要模块
Handler模块:客户端请求的处理。
- Processer模块:以异步方式完成请求的业务逻辑处理。
-
8.3 客户端的登录处理的实战案例
聊天命令的信息收集类:ChatConsoleCommand。
- 登录命令的信息收集类:LoginConsoleCommand。
- 退出命令的信息收集类:LogoutConsoleCommand。
- 命令的类型收集类:ClientCommandMenu。
8.3.1 LoginConsoleCommand和User POJO
```java package com.crazymakercircle.imClient.clientCommand; //… public class LoginConsoleCommand implements BaseCommand { public static final String KEY = “1”; private String userName; //简单起见,假设用户名称和id一致 private String password; //登录密码 @Override public void exec(Scanner scanner) {
} //… }System.out.println("请输入用户信息(id:password) ");String[] info = null;while (true) {String input = scanner.next();info = input.split(":");if (info.length != 2) {System.out.println("请按照格式输入(id:password):");}else {break;}}userName=info[0];password = info[1];
成功获取到用户密码和ID获取后,客户端CommandClient将这些内容组装成User POJO用户对象,然后通过客户端登录消息发送器loginSender开始向服务端发送登录请求,主要代码如下:```javapackage com.crazymakercircle.imClient.client;//…@Service("CommandClient")public class CommandClient {//…//命令收集线程public void startCommandThread() throws InterruptedException {Thread.currentThread().setName("主线程");while (true) {//建立连接while (connectFlag == false) {//开始连接startConnectServer();waitCommandThread();}//处理命令while (null != session &&session.isConnected()) {Scanner scanner = new Scanner(System.in);clientCommandMenu.exec(scanner);String key = clientCommandMenu.getCommandInput();//取到命令收集类POJOBaseCommand command = commandMap.get(key);switch (key) {//登录的命令case LoginConsoleCommand.KEY:command.exec(scanner); //收集用户name和passwordstartLogin((LoginConsoleCommand) command);break;case… //省略其他的命令收集代码}}}}//开始发送登录请求private void startLogin(LoginConsoleCommand command) {//…User user = new User();user.setUid(command.getUserName());user.setToken(command.getPassword());user.setDevId("1111");loginSender.setUser(user);loginSender.setSession(session);loginSender.sendLoginMsg();}//…}
8.3.2 LoginSender
package com.crazymakercircle.imClient.sender;//…@Slf4j@Service("LoginSender")public class LoginSender extends BaseSender {public void sendLoginMsg() {log.info("生成登录消息");ProtoMsg.Message message =LoginMsgBuilder.buildLoginMsg(getUser(), getSession());log.info ("发送登录消息");super.sendMsg(message);}}
BaseSender基类的代码如下
package com.crazymakercircle.imClient.sender;//…public abstract class BaseSender {private User user;private ClientSession session;//…public void sendMsg(ProtoMsg.Message message) {if (null == getSession() || !isConnected()) {log.info("连接还没成功");return;}Channel channel=getSession().getChannel();ChannelFuture f = channel.writeAndFlush(message);f.addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future)…{//回调if (future.isSuccess()) {sendSucceed(message);} else {sendfailed(message);}}});//…}protected void sendSucceed(ProtoMsg.Message message) {log.info("发送成功");}protected void sendfailed(ProtoMsg.Message message) {log.info("发送失败");}}
在处理加入通道时,可以为处理器设置一个单独的处理器线程,大致代码如下:
//创建一个独立的线程池,假定有32条线程EventExecutorGroup threadGroup = new DefaultEventExecutorGroup(32);final OutHandlerDemo handlerA = new OutHandlerDemo();//创建处理器ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){protected void initChannel(EmbeddedChannel ch){//handlerA的执行,从threadGroup池中绑定一条线程ch.pipeline().addLast(threadGroup,handler);}};
8.3.3 ClientSession
ClientSession是一个很重要的胶水类,包含两个成员:一个是user,代表用户;另一个是channel,代表连接的通道。在实际开发中,这两个成员的作用是:
- 通过user,ClientSession可以获得当前的用户信息。
- 通过channel,ClientSession可以向服务端发送消息。
客户端会话ClientSession保存着当前的状态:
- 是否成功连接isConnected。
- 是否成功登录isLogin。
ClientSession客户端会话的主要代码如下:
package com.crazymakercircle.imClient.client;//…public class ClientSession {public static final AttributeKey<ClientSession> SESSION_KEY =AttributeKey.valueOf("SESSION_KEY");private Channel channel;private User user;private String sessionId; //保存登录后的服务端sessionidprivate Boolean isConnected = false;private Boolean isLogin = false;//绑定通道public ClientSession(Channel channel) {this.channel = channel;this.sessionId = String.valueOf(-1);//重要:ClientSession绑定到Channel上channel.attr(ClientSession.SESSION_KEY).set(this);}//登录成功之后,设置sessionIdpublic static void loginSuccess(ChannelHandlerContext ctx, ProtoMsg.Message pkg) {Channel channel = ctx.channel();ClientSession session =channel.attr(ClientSession.SESSION_KEY).get();session.setSessionId(pkg.getSessionId());session.setLogin(true);log.info("登录成功");}//获取通道public static ClientSession getSession(ChannelHandlerContext ctx) {Channel channel = ctx.channel();ClientSession session =channel.attr(ClientSession.SESSION_KEY).get();return session;}//把Protobuf数据包写入通道public ChannelFuture witeAndFlush(Object pkg) {ChannelFuture f = channel.writeAndFlush(pkg);return f;}//…}
什么时候创建客户端会话呢?在Netty客户端发起连接请求之后,增加一个连接建立完成的异步回调任务,代码如下:
package com.crazymakercircle.imClient.client;//…public class CommandController {//…GenericFutureListener<ChannelFuture> connectedListener =(ChannelFuture f) -> {final EventLoop eventLoop = f.channel().eventLoop();if (!f.isSuccess()) {log.info("连接失败!在10秒之后准备尝试重连!");eventLoop.schedule(() ->nettyClient.doConnect(),10, TimeUnit.SECONDS);connectFlag = false;} else {connectFlag = true;log.info("疯狂创客圈 IM 服务器连接成功!");channel = f.channel();//创建会话session= new ClientSession(channel);channel.closeFuture().addListener(closeListener);//唤醒用户线程notifyCommandThread();}};//…}
8.3.4 LoginResponseHandler
package com.crazymakercircle.imClient.handler;//…public class LoginResponseHandlerextends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)…{//判断消息实例if (null == msg || !(msg instanceofProtoMsg.Message)) {super.channelRead(ctx, msg);return;}//判断类型ProtoMsg.Message pkg = (ProtoMsg.Message) msg;ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();if (!headType.equals(ProtoMsg.HeadType.LOGIN_RESPONSE)) {super.channelRead(ctx, msg);return;}//判断返回是否成功ProtoMsg.LoginResponse info = pkg.getLoginResponse();ProtoInstant.ResultCodeEnum result =ProtoInstant.ResultCodeEnum.values()[info.getCode()];if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {//登录失败log.info(result.getDesc());} else {//登录成功ClientSession.loginSuccess(ctx, pkg);ChannelPipeline p = ctx.pipeline();//移除登录响应处理器p.remove(this);//在编码器后面动态插入心跳处理器p.addAfter("encoder", "heartbeat",new HeartBeatClientHandler());}}}
在登录成功之后,需要将LoginResponseHandler登录响应处理器实例从流水线上移除,因为不需要再处理登录响应了。同时,需要在客户端和服务端(即服务器器)之间开启定时的心跳处理。心跳是一个比较复杂的议题,后面会专门详细介绍客户端和服务器之间的心跳。
8.3.5 客户端流水线的装配
package com.crazymakercircle.imClient.client;//省略部分代码public class NettyClient {@Autowiredprivate ChatMsgHandler chatMsgHandler; //聊天消息处理器@Autowiredprivate LoginResponseHandler loginResponceHandler; //登录响应处理器//连接异步监听private GenericFutureListener<ChannelFuture> connectedListener;private Bootstrap b;private EventLoopGroup g;//省略部分代码public void doConnect() {try {b = new Bootstrap();//省略设置通道初始化参数b.handler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel ch) {ch.pipeline().addLast("decoder",new ProtobufDecoder());ch.pipeline().addLast("encoder",new ProtobufEncoder());ch.pipeline().addLast(loginResponseHandler);ch.pipeline().addLast(chatMsgHandler);ch.pipeline().addLast("exception",new ExceptionHandler());}});log.info("客户端开始连接 [疯狂创客圈IM]");ChannelFuture f = b.connect();f.addListener(connectedListener);} catch (Exception e) {log.info("客户端连接失败!" + e.getMessage());}}//…}
8.4 服务端的登录响应的实战案例
服务端的登录处理流程是:
- ProtobufDecoder解码器把请求ByteBuf数据包解码成Protobuf数据包。
- UserLoginRequestHandler登录处理器负责处理Protobuf数据包,进行一些必要的判断和预处理后,启动LoginProcesser登录业务处理器,以异步方式进行登录验证处理。
- LoginProcesser通过数据库或者远程接口完成用户验证,根据验证处理的结果生成登录成功/失败的登录响应报文,并发送给客户端。
8.4.1 服务端流水线的装配
```java package com.crazymakercircle.imServer.server; //… public class ChatServer {//…@Autowiredprivate LoginRequestHandler loginRequestHandler; //登录请求处理器@Autowiredprivate ServerExceptionHandler serverExceptionHandler; //服务器异常处理器public void run() {try {//省略Bootstrap的配置选项//5 装配流水线b.childHandler(new ChannelInitializer<SocketChannel>() {//有连接到达时会创建一个子通道protected void initChannel(SocketChannel ch) …{//装配子通道流水线中的Handler业务处理器
<a name="Z1uiS"></a>## 8.4.2 LoginRequestHandler```javapackage com.crazymakercircle.imServer.handler;//…@Slf4j@Service("LoginRequestHandler")@ChannelHandler.Sharablepublic class LoginRequestHandler extends ChannelInboundHandlerAdapter {@AutowiredLoginProcesser loginProcesser;public void channelRead(ChannelHandlerContext ctx, Object msg) …{if (null == msg || !(msg instanceofProtoMsg.Message)) {super.channelRead(ctx, msg);return;}ProtoMsg.Message pkg = (ProtoMsg.Message) msg;//取得请求类型ProtoMsg.HeadType headType = pkg.getType();if (!headType.equals(loginProcesser.type())) {super.channelRead(ctx, msg);return;}ServerSession session = new ServerSession(ctx.channel());//异步任务,处理登录的逻辑CallbackTaskScheduler.add(new CallbackTask<Boolean>() {@Overridepublic Boolean execute()…{boolean r = loginProcesser.action(session, pkg);return r;}//异步任务返回@Overridepublic void onBack(Boolean r) {if (r) {ctx.pipeline().remove(LoginRequestHandler.this);log.info("登录成功:" + session.getUser());} else {ServerSession.closeSession(ctx);log.info("登录失败:" + session.getUser());
8.4.3 LoginProcesser
package com.crazymakercircle.imServer.processer;//…@Slf4j@Service("LoginProcesser")public class LoginProcesser extends AbstractServerProcesser {@AutowiredLoginResponseBuilderloginResponseBuilder;@Overridepublic ProtoMsg.HeadTypetype() {return ProtoMsg.HeadType.LOGIN_REQUEST;}@Overridepublic boolean action(ServerSession session,ProtoMsg.Message proto){//取出token验证ProtoMsg.LoginRequest info = proto.getLoginRequest();long seqNo = proto.getSequence();User user = User.fromMsg(info);//检查用户booleanisValidUser = checkUser(user);if (!isValidUser) {ProtoInstant.ResultCodeEnum resultcode =ProtoInstant.ResultCodeEnum.NO_TOKEN;//生成登录失败的报文ProtoMsg.Message response =loginResponseBuilder.loginResponse(resultcode, seqNo, "-1");//发送登录失败的报文session.writeAndFlush(response);return false;}session.setUser(user);session.bind();//登录成功ProtoInstant.ResultCodeEnum resultcode =ProtoInstant.ResultCodeEnum.SUCCESS;//生成登录成功的报文ProtoMsg.Message response = loginResponseBuilder.loginResponse(resultcode, seqNo, session.getSessionId());//发送登录成功的报文session.writeAndFlush(response);return true;}private booleancheckUser(User user) {if (SessionMap.inst().hasLogin(user)) {return false;}//验证用户,比较耗时的操作,需要200毫秒以上的时间甚至更多//方法1:调用远程用户RESTful校验服务//方法2:调用数据库接口校验return true;}}
8.4.4 EventLoop线程和业务线程相互隔离
创建Netty的EventLoopGroup线程池,专用于处理耗时任务。 ```java //创建一个独立的线程池,假定有32条线程
EventExecutorGroup threadGroup = new DefaultEventExecutorGroup(32); final OutHandlerDemo handlerA = new OutHandlerDemo();//创建处理器 ChannelInitializer i = new ChannelInitializer
(){ protected void initChannel(EmbeddedChannel ch){//处理器加入通道时,从专用threadGroup池中绑定一条线程ch.pipeline().addLast(threadGroup,handler);}
};
2. 创建一个专门的Java线程池,专用于处理耗时任务。```javapackage com.crazymakercircle.cocurrent;//…public class FutureTaskScheduler extends Thread{//方法二是使用自建的线程池时专用于处理耗时操作private static final ExecutorService POOL=Executors.newFixedThreadPool(10);//添加耗时任务public static void add(Runnable executeTask){POOL.submit(executeTask);}}
8.5 详解Session服务器会话
8.5.1 通道的容器属性

- AttributeKey不是原始的键(如Map中的键),而是一个键的包装类。AttributeKey确保了键的唯一性,在单个Netty应用中,AttributeKey必须唯一。
- 这里的Attribute值不是原始的值(如Map中的值),也是值的包装类。原始的值就放置在Attribute包装实例中,可以通过Attribute包装类实现值的读取(get)和设置(set)。
在Netty中,接口AttributeMap的源代码如下:
package io.netty.util;public interface AttributeMap {<T> Attribute<T> attr(AttributeKey<T> key);}
- Attribute的设值
```java
//定义键
public static final AttributeKey
SESSION_KEY = AttributeKey.valueOf(“SESSION_KEY”); //… //通过设置将会话绑定到通道 channel.attr(SESSION_KEY).set(session);
AttributeKey的创建需要用到静态方法AttributeKey.valueOf(String)方法。该方法的返回值为一个AttributeKey实例,其泛型参数为实际键-值对中值的实际类型。如果实际的值是ServerSession类型,则定义键的泛型参数为ServerSession,整个AttributeKey定义为AttributeKey<ServerSession>。```java//键的泛型形参是设置的值类型public static final AttributeKey<ServerSession> SESSION_KEY =AttributeKey.valueOf("SESSION_KEY");
创建完AttributeKey后,就可以通过通道完成键-值对的设值(set)、取值(get)了。常常使用链式调用,首先通过通道的attr(AttributeKey)方法取得value的包装类Attribute实例,然后通过Attribute的set()方法设置真正的值。在例子中,值是一个会话(Session)实例。
这里的AttributeKey一般定义为一个常量,需要提前定义;它的泛型参数是最终的Attribute的包装值value的数据类型。
- Attribute取值
```java
//取得Attribute实例
Attribute
attribute = ctx.channel().attr(SESSION_KEY); ServerSession session=attribute.get();
<a name="ag0PU"></a>## 8.5.2 ServerSession服务端会话类```javapackage com.crazymakercircle.imServer.server;//…public class ServerSession {public static final AttributeKey<ServerSession> SESSION_KEY =AttributeKey.valueOf("SESSION_KEY");private Channel channel; //通道private User user; //用户private final String sessionId;//会话唯一标识private boolean isLogin = false;//登录状态public ServerSession(Channel channel) {this.channel = channel;this.sessionId = buildNewSessionId();}//反向导航public static ServerSession getSession(ChannelHandlerContext ctx) {Channel channel = ctx.channel();return channel.attr(ServerSession.SESSION_KEY).get();}//和通道实现双向绑定public ServerSession bind() {log.info(" ServerSession绑定会话 " + channel.remoteAddress());channel.attr(ServerSession.SESSION_KEY).set(this);SessionMap.inst().addSession(getSessionId(), this);isLogin = true;return this;}//构造session idprivate static String buildNewSessionId() {String uuid = UUID.randomUUID().toString();return uuid.replaceAll("-", "");}//省略不是太重要的方法}
8.5.3 SessionMap会话管理器
这里使用一个会话容器SessionMap,负责管理服务端所有的ServerSession,其内部使用一个线程安全的ConcurrentHashMap类型的映射成员,保持sessionId到服务端ServerSession的映射。
package com.crazymakercircle.imServer.server;//…public final class SessionMap {private ConcurrentHashMap<String, ServerSession> map =new ConcurrentHashMap<String, ServerSession>();//增加会话对象public void addSession(String sessionId, ServerSession s) {map.put(sessionId, s);log.info("用户登录:id= " + s.getUser().getUid()+ " 在线总数: " + map.size());}//获取会话对象public ServerSession getSession(String sessionId) {if (map.containsKey(sessionId)) {return map.get(sessionId);} else {return null;}}//省略不是太重要的方法}
8.6 点对点单聊的实战案例
8.6.1 单聊的端到端流程
8.6.2 客户端的ChatConsoleCommand收集聊天内容
聊天消息收集类ChatConsoleCommand负责从控制台Scanner实例收集用户输入的聊天消息(格式为id:message),代码如下:
package com.crazymakercircle.imClient.command;//…@Data@Service("ChatConsoleCommand")public class ChatConsoleCommand implements BaseCommand {private String toUserId; //目标用户id(这里为登录的用户名称)private String message; //聊天内容public static final String KEY = "2";@Overridepublic void exec(Scanner scanner) {System.out.print("请输入聊天的消息(id:message):");String[] info = null;while (true) {String input = scanner.next();info = input.split(":");if (info.length != 2) {System.out.println("请输入聊天的消息(id:message):");}else {break;}}toUserId = info[0];message = info[1];}//…}
8.6.3 客户端的CommandController发送POJO
ChatConsoleCommand的调用者是CommandController命令控制类,该控制类在收集完成聊天内容和目标用户后,在自己的startOneChat()方法中调用ChatSender发送实例,将聊天消息组装成Protobuf数据包,通过客户端的通道发往服务端。
package com.crazymakercircle.imClient.client;//…public class CommandController {@AutowiredChatConsoleCommand chatConsoleCommand; //聊天命令收集器实例//省略其他成员public void startCommandThread()throws InterruptedException {Thread.currentThread().setName("命令线程");while (true) {//建立连接while (connectFlag == false) {//开始连接startConnectServer();waitCommandThread();}//处理命令while (null != session ) {Scanner scanner = new Scanner(System.in);clientCommandMenu.exec(scanner);String key = clientCommandMenu.getCommandInput();BaseCommand command = commandMap.get(key);//…switch (key) {case ChatConsoleCommand.KEY:command.exec(scanner);startOneChat((ChatConsoleCommand) command);break;//省略其他命令}}}}//发送单聊消息private void startOneChat(ChatConsoleCommand c) {chatSender.setSession(session);chatSender.setUser(user);chatSender.sendChatMsg(c.getToUserId(), c.getMessage());}//省略其他的命令处理}
8.6.4 服务端的ChatRedirectHandler进行消息转发
服务端收到聊天消息后会进行消息的转发,主要由消息转发处理器ChatRedirectHandler负责,其大致的工作如下:
- 对消息类型进行判断:判断是否为聊天请求Protobuf数据包。如果不是,通过调用super.channelRead(ctx, msg)将消息交给流水线的下一站。
- 对消息发送方用户登录进行判断:如果没有登录,则不能发送消息。
开启异步的消息转发,由其ChatRedirectProcesser实例负责完成消息转发。
package com.crazymakercircle.imServer.handler;//…public class ChatRedirectHandler extends ChannelInboundHandlerAdapter {@AutowiredChatRedirectProcesserchatRedirectProcesser;public void channelRead(ChannelHandlerContext ctx, Object msg) …{//判断消息实例if (null == msg || !(msg instanceofProtoMsg.Message)) {super.channelRead(ctx, msg);return;}//判断消息类型ProtoMsg.Message pkg = (ProtoMsg.Message) msg;ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();if (!headType.equals(chatRedirectProcesser.type())) {super.channelRead(ctx, msg);return;}//判断是否登录ServerSession session = ServerSession.getSession(ctx);if (null == session || !session.isLogin()) {log.error("用户尚未登录,不能发送消息");return;}//异步处理IM消息转发的逻辑FutureTaskScheduler.add(() ->{chatRedirectProcesser.action(session, pkg);});}}
8.6.5 服务端的ChatRedirectProcesser进行异步消息转发
ChatRedirectProcesser异步消息转发类负责将消息发送到目标用户,这是一个异步执行的任务,其大致功能如下:
根据目标用户ID找出所有的服务端的会话列表。
- 为每一个会话转发一份消息
```java
package com.crazymakercircle.imServer.processer;
//…
public class ChatRedirectProcesser extends AbstractServerProcesser {
@Override
public ProtoMsg.HeadTypetype() {
} @Override public boolean action(ServerSessionfromSession,return ProtoMsg.HeadType.MESSAGE_REQUEST;
} } //由于一个用户可能有多个会话,因此需要通过调用SessionMap会话管理器的SessionMap.inst().getSessionsBy(uid)方法来取得这个用户的所有会话。 package com.crazymakercircle.imServer.server; //… @Slf4j @Data public final class SessionMap { //全部的会话映射 “uid->session” private ConcurrentHashMapProtoMsg.Message proto) {//聊天处理ProtoMsg.MessageRequest msg = proto.getMessageRequest();//获取接收方的chatIDString to = msg.getTo();List<ServerSession> toSessions =SessionMap.inst().getSessionsBy(to);if (toSessions == null) {//接收方离线,这里一般会做离线消息处理Print.tcfo("[" + to + "] 不在线,发送失败!");} else {toSessions.forEach((session) -> {//将IM消息发送到每一个接收方的通道session.writeAndFlush(proto);});}return true;
map = new ConcurrentHashMap<String, ServerSession>();
//根据用户id获取会话集合
public List
<a name="i6zBf"></a>## 8.6.6 客户端的ChatMsgHandler聊天消息处理器1. 对消息类型进行判断,判断是否为聊天请求Protobuf数据包。如果不是,通过super.channelRead(ctx, msg)将消息交给流水线的下一站。1. 如果是聊天消息,就将聊天消息显示在控制台。```javapackage com.crazymakercircle.imClient.handler;//…public class ChatMsgHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) …{//判断类型ProtoMsg.Message pkg = (ProtoMsg.Message) msg;ProtoMsg.HeadType headType = pkg.getType();if (!headType.equals(ProtoMsg.HeadType.MESSAGE_REQUEST)) {super.channelRead(ctx, msg);return; //不是聊天消息}ProtoMsg.MessageRequest req = pkg.getMessageRequest();String content = req.getContent();String uid = req.getFrom();System.out.println(" 收到消息 from uid:" + uid + " -> " + content);}}
8.7 详解心跳检测
8.7.1 网络连接的假死现象
什么是连接假死呢?如果底层的TCP连接(socket连接)已经断开,但是服务端并没有正常关闭套接字,服务端认为这条TCP连接仍然是存在的,则该连接处于“假死”状态。连接假死的具体表现如下:
- 在服务端,会有一些处于TCP_ESTABLISHED状态的“正常”连接。
- 在客户端,TCP客户端显示连接已经断开。
- 虽然客户端可以进行断线重连操作,但是上一次的连接状态依然被服务端认为有效,并且服务端的资源得不到正确释放,包括套接字上下文以及接收/发送缓冲区。
连接假死通常是由以下多个原因造成的,例如:
- 应用程序出现线程堵塞,无法进行数据的读写。
- 网络相关的设备出现故障,例如网卡、机房故障。
- 网络丢包。公网环境非常容易出现丢包和网络抖动等现象。
8.7.2 服务端的空闲检测
空闲检测就是每隔一段时间检测子通道是否有数据读写,如果有,则子通道是正常的;如果没有,则子通道被判定为假死,关掉子通道。
服务端如何实现空闲检测呢?使用Netty自带的IdleStateHandler空闲状态处理器就可以实现这个功能。下面的示例程序继承自IdleStateHandler,定义一个假死处理类:
package com.crazymakercircle.imServer.handler;//…public class HeartBeatServerHandler extends IdleStateHandler {private static final int READ_IDLE_GAP = 150; //最大空闲,单位秒public HeartBeatServerHandler() {super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);}@Overrideprotected void channelIdle(ChannelHandlerContext ctx,IdleStateEventevt) …{System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");ServerSession.closeSession(ctx);}public void channelRead(ChannelHandlerContext ctx, Object msg){//…ProtoMsg.Message pkg = (ProtoMsg.Message) msg;//判断和处理心跳数据包ProtoMsg.HeadType headType = pkg.getType();if (headType.equals(ProtoMsg.HeadType.HEART_BEAT)) {//异步处理,将心跳数据包直接回复给客户端FutureTaskScheduler.add(() -> {if (ctx.channel().isActive()) {ctx.writeAndFlush(msg);}});}super.channelRead(ctx, msg);}}
在HeartBeatServerHandler的构造函数中,调用了基类IdleStateHandler的构造函数,传递了四个参数:
public HeartBeatServerHandler() {super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);}
其中,第一个参数表示入站(Inbound)空闲时长,指的是一段时间内如果没有数据入站,就判定连接假死;第二个参数是出站(Outbound)空闲时长,指的是一段时间内如果没有数据出站,就判定连接假死;第三个参数是出/入站检测时长,表示在一段时间内如果没有出站或者入站,就判定连接假死;最后一个参数表示时间单位,TimeUnit.SECONDS表示秒。<br />假死被判定之后,IdleStateHandler类会回调自己的channelIdle()方法。在这个子类的重写版本中,重写了空闲回调方法,手动关闭连接。
@Overrideprotected void channelIdle(ChannelHandlerContext ctx,IdleStateEventevt) …{System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");ServerSession.closeSession(ctx);}
8.7.3 客户端的心跳发送
与服务端的空闲检测相配合,客户端需要定期发送数据包到服务端,通常这个数据包称为心跳数据包。接下来,定义一个Handler业务处理器定期发送心跳数据包给服务端。
package com.crazymakercircle.imClient.handler;//…public class HeartBeatClientHandlerextends ChannelInboundHandlerAdapter {//心跳的时间间隔,单位为秒private static final int HEARTBEAT_INTERVAL = 50;//在Handler业务处理器被加入到流水线时,开始发送心跳数据包@Overridepublic void handlerAdded(ChannelHandlerContext ctx) …{ClientSession session = ClientSession.getSession(ctx);User user = session.getUser();HeartBeatMsgBuilder builder =new HeartBeatMsgBuilder(user, session);ProtoMsg.Message message = builder.buildMsg();//发送心跳数据包heartBeat(ctx, message);}//使用定时器,定期发送心跳数据包public void heartBeat(ChannelHandlerContext ctx,ProtoMsg.MessageheartbeatMsg) {//提交一个一次性的定时任务ctx.executor().schedule(() -> {if (ctx.channel().isActive()) {log.info(" 发送HEART_BEAT 消息to server");ctx.writeAndFlush(heartbeatMsg);//递归调用:提交下一个一次性的定时任务,发送下一次的心跳heartBeat(ctx, heartbeatMsg);}}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}//接收到服务器的心跳回写@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg){//判断类型ProtoMsg.Message pkg = (ProtoMsg.Message) msg;ProtoMsg.HeadType headType = pkg.getType();if (headType.equals(ProtoMsg.HeadType.HEART_BEAT)) {log.info(" 收到回写的HEART_BEAT 消息 from server");return;} else {
HeartBeatClientHandler实例并不是一开始就装配到了流水线中的,它装配的时机是在登录成功之后。登录处理器LoginResponseHandler的相关代码如下:
package com.crazymakercircle.imClient.clientHandler;//…public class LoginResponseHandlerextends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)…{//省略登录数据包的预处理if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {//登录失败log.info(result.getDesc());} else {//登录成功//省略其他处理//在编码器后面动态插入心跳处理器ChannelPipeline p=ctx.pipeline();p.addAfter("encoder","heartbeat",new HeartBeatClientHandler());}}}
