一、实例要求
实例要求:使用 IDEA 创建 Netty 项目
- Netty 服务器在 6668 端口监听,客户端能发送消息给服务器”hello,服务器~”
- 服务器可以回复消息给客户端”hello,客户端~”
- 目的:对 Netty 线程模型有一个初步认识,便于理解 Netty 模型理论
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
- @description:
- @Author: wangchao
@Date: 2021/12/6 */ public class NettyServer { public static void main(String[] args) throws InterruptedException {
// 创建 BossGroup 和 WorkerGroup// 说明// 1. 创建两个线程组 bossGroup 和 workerGroup// 2. bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成// 3. 两个都是无限循环// 4. bossgroup 和 workGroup 含有的子线程(NIOEventGroup)的个数 默认实际是cpu的核数NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);// 12核cpu,默认是24个线程 NettyRuntime.availableProcessors() * 2
// NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 方便测试,设置为 3 个线程NioEventLoopGroup workerGroup = new NioEventLoopGroup(3);try {// 创建服务器端的启用对象,配置参数ServerBootstrap bootstrap = new ServerBootstrap();// 使用链式编程来进行设置bootstrap.group(bossGroup, workerGroup)// 设置两个现场组.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器的通道实现.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接数.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态.childHandler(new ChannelInitializer<SocketChannel>() { // 创建一个通道测试对象(匿名)// 给 pipeline 设置处理器@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 在管道的最后添加一个处理器(自定义的)ch.pipeline().addLast(new NettyServerHandler());}}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器System.out.println(".....服务器 is ready");// 绑定一个端口并且同步,生成一个 channelFuture 对象// 启动服务器并绑定端口ChannelFuture cf = bootstrap.bind(6677).sync();// 对关闭通道进行监听cf.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
NettyServerHandler
```java package com.supkingx.netty.simple;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil;
/**
- 说明
- 1、我们自定义一个 Handler 需要继续netty 规定好的谋和 handlerAdapter(规范)
- 2、这时我们自定义一个Handler,才能称为一个handler *
- @description:
- @Author: wangchao
@Date: 2021/12/6 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 读取数据实际(这里我们可以继续读取客户端发送的消息)
/**
- @param ctx 上下文对象,含有 管道pipeline,通道channel,地址
- @param msg 就是客户端发送的数据 默认Object
- @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(“服务器读取线程 “ + Thread.currentThread().getName()); System.out.println(“server ctx=” + ctx); System.out.println(“看看channel 和 pipeline的关系”); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline();
// 将 msg 转化成一个 ByteBuffer// ByteBuf 是 Netty 提供的,不是NIO 的 ByteBufferByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));System.out.println("客户端地址:" + ctx.channel().remoteAddress());}/*** 数据读取完毕* 可以在这里向通道写回数据** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 是 write + flush// 将数据写入到缓存并刷新// 一般讲,我们对发送的数据进行编码ctx.writeAndFlush(Unpooled.copiedBuffer("hello,client", CharsetUtil.UTF_8));}/*** 处理异常** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("服务端异常了");cause.printStackTrace();ctx.close();}
}
<a name="zUi9H"></a>## 2、客户端<a name="YGdkr"></a>### NettyClient```javapackage com.supkingx.netty.simple;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/*** @description:* @Author: wangchao* @Date: 2021/12/6*/public class NettyClient {public static void main(String[] args) throws InterruptedException {// 客户端需要一个事件循环跑NioEventLoopGroup group = new NioEventLoopGroup();try {// 创建客户端启动对象// 注意客户端使用的不是 serverBootstrap 而是 BootStrapBootstrap bootstrap = new Bootstrap();// 设置相关参数bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {// 在管道的最后添加一个处理器(自定义的)channel.pipeline().addLast(new NettyClientHandler());}});System.out.println("客户端。。ok");// 启动客户端去连接服务器端// 关于 channelFuture 要分析,涉及到 netty 的异步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6677).sync();// 给关闭通道进行监听channelFuture.channel().closeFuture().sync();} finally {// 优雅的关闭group.shutdownGracefully();}}}
NettyClientHandler
package com.supkingx.netty.simple;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;import java.nio.charset.StandardCharsets;/*** @description:* @Author: wangchao* @Date: 2021/12/6*/public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 当通道就绪时,就会触发这个方法** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client " + ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server!", StandardCharsets.UTF_8));}/*** 当通道有时间读取时,会触发** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));System.out.println("服务器的地址:" + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("客户端异常了");cause.printStackTrace();ctx.close();}}
三、源码简单分析
NioEventLoopGroup
1.默认线程数是0
�
2.进入这个方法内部一直深入,直到找到如下方法,可以看到当线程数=0的时候,返回一个默认的线程
3.默认线程如下,即当前CPU核心数(NettyRuntime.availableProcessors())*2
4.debug继续,由于我的电脑是12核心,所以这时的 NioEventLoopGroup 使用 24 线程,即有 24个children
children部分展示如下,有24个
验证workgroup每次都会启动一个新线程
- 这里我们启动了两个client,一个server,可以看到server使用了两个线程来对应每个客户端。

- 由于我们24个线程,启动24个客户端过于繁琐,这里将workGroup的线程数设置为3,便于测试。
可以看到,启动了4个客户端之后,workGroup会依次使用线程,第四个客户端会去使用第一个线程。
chanel 和 pipeline
通过下图可以简单的发现。
- channel中有pipeline,pipeline中有channel,这两个相互对应
- pipeline 是一个双向链表
四、上诉实例能不能再次优化?
1、缺陷
可以在 NettyServerHandler 类的 channelRead 方法中添加如下代码,可以发现当接受客户端的消息时,在这里被 阻塞 了,这样是不行的,本文将会介绍如何优化。
try {Thread.sleep(10 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,你好!", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常" + e.getMessage());}
2、简单优化
可以在 NettyServerHandler 类的 channelRead 中添加异步任务,即可解决阻塞问题。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 比如我们有一个耗时很长的任务 -> 异步执行 -> 提交该channel 对应的 NIOEventloop 的 taskQueue中// 解决方案1 用哪个户程序自定义的普通任务ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(10 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,你好!", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常" + e.getMessage());}}});System.out.println("server....go");}
上述方案的异步原理是,将异步任务放到 ChannelHandlerContext(DefaultChannelHandlerContext
�类型)—>pipeline(DefaultChannelPipeline�类型) —> channel(NioSocketChannel�类型) —> evenLoop(NioEventLoop类型)—>taskQueue(NioEventLoop�类型) 中,等待执行。
3、探究 taskQueue
疑问:上诉优化中,假如 channelRead 有两个异步任务,这两个异步任务会怎么执行?
例如:在 channelRead 方法中有两个线程,第一个任务阻塞 10s,第二个任务阻塞 20s,当程序运行,客户端发送消息过来时,会是怎么样的?
假如客户端 2:00:00 发送消息到客户端,两种情况 1、第一个任务 2:00:10执行,第二个任务2:00:20执行 2、第一个任务 2:00:10执行,第二个任务2:00:30执行
答案是 第二种情况,当有两个线程在 taskQueue 中时,这两个线程会顺序执行。代码如下
com.supkingx.netty.simple2.NettyServerHandler#channelRead
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 比如我们有一个耗时很长的任务 -> 异步执行 -> 提交该channel 对应的 NIOEventloop 的 taskQueue中// 解决方案1 用哪个户程序自定义的普通任务ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(10 * 1000);String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));ctx.writeAndFlush(Unpooled.copiedBuffer(now + " hello,客户端,你好!", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常" + e.getMessage());}}});ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(20 * 1000);String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));ctx.writeAndFlush(Unpooled.copiedBuffer(now + " hello,客户端,你好!", CharsetUtil.UTF_8));} catch (InterruptedException e) {System.out.println("发生异常" + e.getMessage());}}});String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));System.out.println("server....go," + now);}
执行代码并验证
服务端在 2021-12-09 00:03:22 收到客户端的消息,由于是异步,所以先输出了 server….go
等待 10s至00:03:32时,第一个任务执行并向客户端发送消息,客户端收到消息
等待 30s至00:03:52时,第二个任务执行并向客户端发送消息,客户端收到消息
如上可验证结果。
总结
- Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket网络通道。
- NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
- NioEventLoopGroup 下包含多个 NioEventLoop
- 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
- 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline
- ChannelHandlerContext(DefaultChannelHandlerContext
�类型)—>pipeline(DefaultChannelPipeline�类型) —> channel(NioSocketChannel�类型) —> evenLoop(NioEventLoop类型)—>taskQueue(NioEventLoop�类型) 中的 taskQueue 是串行设计。

