类似于 java.nio 包 的 Channel,Netty 提供了自己的 Channel 和其子类实现,用于异步 I/O 操作 等。Unsafe 是 Channel 的内部接口,聚合在 Channel 中协助进行网络读写相关的操作,因为它的设计初衷就是 Channel 的内部辅助类,不应该被 Netty 框架 的上层使用者调用,所以被命名为 Unsafe。

Channel 组件

Netty 的 Channel 组件 是 Netty 对网络操作的封装如 网络数据的读写,与客户端建立连接,主动关闭连接 等,也包含了 Netty 框架 相关的一些功能,如 获取该 Chanel 的 EventLoop、ChannelPipeline 等。另外,Netty 并没有直接使用 java.nio 包 的 SocketChannel 和 ServerSocketChannel,而是使用 NioSocketChannel 和 NioServerSocketChannel 对其进行了进一步的封装。下面我们先从 Channel 接口 的 API 开始分析,然后看一下其重要子类的源码实现。

为了便于后面的阅读源码,我们先看下 NioSocketChannel 和 NioServerSocketChannel 的继承关系类图。 在这里插入图片描述

Channel 接口

  1. public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
  2. /**
  3. * Channel 需要注册到 EventLoop 的多路复用器上,用于处理 I/O事件,
  4. * EventLoop 实际上就是处理网络读写事件的 Reactor线程。
  5. */
  6. EventLoop eventLoop();
  7. /**
  8. * ChannelMetadata 封装了 TCP参数配置
  9. */
  10. ChannelMetadata metadata();
  11. /**
  12. * 对于服务端Channel而言,它的父Channel为空;
  13. * 对于客户端Channel,它的 父Channel 就是创建它的 ServerSocketChannel
  14. */
  15. Channel parent();
  16. /**
  17. * 每个 Channel 都有一个全局唯一标识
  18. */
  19. ChannelId id();
  20. /**
  21. * 获取当前 Channel 的配置信息,如 CONNECT_TIMEOUT_MILLIS
  22. */
  23. ChannelConfig config();
  24. /**
  25. * 当前 Channel 是否已经打开
  26. */
  27. boolean isOpen();
  28. /**
  29. * 当前 Channel 是否已注册进 EventLoop
  30. */
  31. boolean isRegistered();
  32. /**
  33. * 当前 Channel 是否已激活
  34. */
  35. boolean isActive();
  36. /**
  37. * 当前 Channel 的本地绑定地址
  38. */
  39. SocketAddress localAddress();
  40. /**
  41. * 当前 Channel 的远程绑定地址
  42. */
  43. SocketAddress remoteAddress();
  44. /**
  45. * 当前 Channel 是否可写
  46. */
  47. boolean isWritable();
  48. /**
  49. * 当前 Channel 内部的 Unsafe对象
  50. */
  51. Unsafe unsafe();
  52. /**
  53. * 当前 Channel 持有的 ChannelPipeline
  54. */
  55. ChannelPipeline pipeline();
  56. /**
  57. * 从当前 Channel 中读取数据到第一个 inbound缓冲区 中,如果数据被成功读取,
  58. * 触发ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。
  59. * 读取操作API调用完成之后,紧接着会触发ChannelHandler.channelReadComplete(ChannelHandlerContext)事件,
  60. * 这样业务的ChannelHandler可以决定是否需要继续读取数据。如果己经有读操作请求被挂起,则后续的读操作会被忽略。
  61. */
  62. @Override
  63. Channel read();
  64. /**
  65. * 将之前写入到发送环形数组中的消息全部写入到目标Chanel中,发送给通信对方
  66. */
  67. @Override
  68. Channel flush();
  69. }

AbstractChannel

  1. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
  2. // 父Channel
  3. private final Channel parent;
  4. // Channel的全局唯一标识
  5. private final ChannelId id;
  6. // 内部辅助类 Unsafe
  7. private final Unsafe unsafe;
  8. // Netty 会为每一个 channel 创建一个 pipeline
  9. private final DefaultChannelPipeline pipeline;
  10. // 本地地址
  11. private volatile SocketAddress localAddress;
  12. // 远程主机地址
  13. private volatile SocketAddress remoteAddress;
  14. // 注册到了哪个 EventLoop 上
  15. private volatile EventLoop eventLoop;
  16. // 是否已注册
  17. private volatile boolean registered;
  18. /**
  19. * channnel 会将 网络IO操作 触发到 ChannelPipeline 对应的事件方法。
  20. * Netty 基于事件驱动,我们也可以理解为当 Chnanel 进行 IO操作 时会产生对应的IO 事件,
  21. * 然后驱动事件在 ChannelPipeline 中传播,由对应的 ChannelHandler 对事件进行拦截和处理,
  22. * 不关心的事件可以直接忽略
  23. */
  24. @Override
  25. public ChannelFuture bind(SocketAddress localAddress) {
  26. return pipeline.bind(localAddress);
  27. }
  28. @Override
  29. public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
  30. return pipeline.bind(localAddress, promise);
  31. }
  32. @Override
  33. public ChannelFuture connect(SocketAddress remoteAddress) {
  34. return pipeline.connect(remoteAddress);
  35. }
  36. @Override
  37. public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
  38. return pipeline.connect(remoteAddress, localAddress);
  39. }
  40. @Override
  41. public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
  42. return pipeline.connect(remoteAddress, promise);
  43. }
  44. @Override
  45. public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
  46. return pipeline.connect(remoteAddress, localAddress, promise);
  47. }
  48. @Override
  49. public ChannelFuture disconnect() {
  50. return pipeline.disconnect();
  51. }
  52. @Override
  53. public ChannelFuture disconnect(ChannelPromise promise) {
  54. return pipeline.disconnect(promise);
  55. }
  56. @Override
  57. public ChannelFuture close() {
  58. return pipeline.close();
  59. }
  60. @Override
  61. public ChannelFuture close(ChannelPromise promise) {
  62. return pipeline.close(promise);
  63. }
  64. @Override
  65. public ChannelFuture deregister() {
  66. return pipeline.deregister();
  67. }
  68. @Override
  69. public ChannelFuture deregister(ChannelPromise promise) {
  70. return pipeline.deregister(promise);
  71. }
  72. @Override
  73. public Channel flush() {
  74. pipeline.flush();
  75. return this;
  76. }
  77. @Override
  78. public Channel read() {
  79. pipeline.read();
  80. return this;
  81. }
  82. @Override
  83. public ChannelFuture write(Object msg) {
  84. return pipeline.write(msg);
  85. }
  86. @Override
  87. public ChannelFuture write(Object msg, ChannelPromise promise) {
  88. return pipeline.write(msg, promise);
  89. }
  90. @Override
  91. public ChannelFuture writeAndFlush(Object msg) {
  92. return pipeline.writeAndFlush(msg);
  93. }
  94. @Override
  95. public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
  96. return pipeline.writeAndFlush(msg, promise);
  97. }
  98. }

AbstractNioChannel

  1. public abstract class AbstractNioChannel extends AbstractChannel {
  2. // AbstractNioChannel 是 NioSocketChannel和NioServerSocketChannel 的公共父类,所以定义
  3. // 了一个 java.nio 的 SocketChannel 和 ServerSocketChannel 的公共父类 SelectableChannel,
  4. // 用于设置 SelectableChannel参数 和进行 IO操作
  5. private final SelectableChannel ch;
  6. // 它代表了 JDK 的 SelectionKey.OP_READ
  7. protected final int readInterestOp;
  8. // 该 SelectionKey 是 Channel 注册到 EventLoop 后返回的,
  9. // 由于 Channel 会面临多个业务线程的并发写操作,当 SelectionKey 被修改了,
  10. // 需要让其他业务线程感知到变化,所以使用volatile保证修改的可见性
  11. volatile SelectionKey selectionKey;
  12. /**
  13. * Channel 的注册
  14. */
  15. @Override
  16. protected void doRegister() throws Exception {
  17. boolean selected = false;
  18. for (;;) {
  19. try {
  20. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  21. return;
  22. } catch (CancelledKeyException e) {
  23. if (!selected) {
  24. // Force the Selector to select now as the "canceled" SelectionKey may still be
  25. // cached and not removed because no Select.select(..) operation was called yet.
  26. eventLoop().selectNow();
  27. selected = true;
  28. } else {
  29. // We forced a select operation on the selector before but the SelectionKey is still cached
  30. // for whatever reason. JDK bug ?
  31. throw e;
  32. }
  33. }
  34. }
  35. }
  36. protected SelectableChannel javaChannel() {
  37. return ch;
  38. }
  39. @Override
  40. protected void doBeginRead() throws Exception {
  41. // Channel.read() 或 ChannelHandlerContext.read() 被调用
  42. final SelectionKey selectionKey = this.selectionKey;
  43. if (!selectionKey.isValid()) {
  44. return;
  45. }
  46. readPending = true;
  47. final int interestOps = selectionKey.interestOps();
  48. if ((interestOps & readInterestOp) == 0) {
  49. selectionKey.interestOps(interestOps | readInterestOp);
  50. }
  51. }
  52. }

NioServerSocketChannel

  1. public class NioServerSocketChannel extends AbstractNioMessageChannel
  2. implements io.netty.channel.socket.ServerSocketChannel {
  3. // java.nio 包的内容,用于获取 java.nio.channels.ServerSocketChannel 实例
  4. private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
  5. private static ServerSocketChannel newSocket(SelectorProvider provider) {
  6. try {
  7. /**
  8. * 获取的是 java.nio.channels.ServerSocketChannel 实例
  9. */
  10. return provider.openServerSocketChannel();
  11. } catch (IOException e) {
  12. throw new ChannelException("Failed to open a server socket.", e);
  13. }
  14. }
  15. /**
  16. * Create a new instance
  17. */
  18. public NioServerSocketChannel() {
  19. this(newSocket(DEFAULT_SELECTOR_PROVIDER));
  20. }
  21. /**
  22. * 在父类中完成了 非阻塞IO的配置,及事件的注册
  23. */
  24. public NioServerSocketChannel(ServerSocketChannel channel) {
  25. super(null, channel, SelectionKey.OP_ACCEPT);
  26. config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  27. }
  28. /**
  29. * 对 NioServerSocketChannel 来说,它的读取操作就是接收客户端的连接,创建 NioSocketChannel对象
  30. */
  31. @Override
  32. protected int doReadMessages(List<Object> buf) throws Exception {
  33. // 首先通过 ServerSocketChannel 的 accept()方法 接收新的客户端连接,
  34. // 获取 java.nio.channels.SocketChannel 对象
  35. SocketChannel ch = SocketUtils.accept(javaChannel());
  36. try {
  37. // 如果获取到客户端连接对象 SocketChannel,则利用当前的 NioServerSocketChannel、EventLoop
  38. // 和 SocketChannel 创建新的 NioSocketChannel,并添加到 buf 中
  39. if (ch != null) {
  40. buf.add(new NioSocketChannel(this, ch));
  41. return 1;
  42. }
  43. } catch (Throwable t) {
  44. logger.warn("Failed to create a new channel from an accepted socket.", t);
  45. try {
  46. ch.close();
  47. } catch (Throwable t2) {
  48. logger.warn("Failed to close a socket.", t2);
  49. }
  50. }
  51. return 0;
  52. }
  53. }

NioSocketChannel

  1. public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
  2. // 与 NioServerSocketChannel 一样,也依赖了 java.nio包 的API
  3. private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
  4. /**
  5. * 从这里可以看出,NioSocketChannel 对 java.nio.channels.SocketChannel 做了进一步封装
  6. * 使其 适用于 Netty框架
  7. */
  8. private static SocketChannel newSocket(SelectorProvider provider) {
  9. try {
  10. return provider.openSocketChannel();
  11. } catch (IOException e) {
  12. throw new ChannelException("Failed to open a socket.", e);
  13. }
  14. }
  15. /**
  16. * Create a new instance
  17. */
  18. public NioSocketChannel() {
  19. this(DEFAULT_SELECTOR_PROVIDER);
  20. }
  21. public NioSocketChannel(SelectorProvider provider) {
  22. this(newSocket(provider));
  23. }
  24. public NioSocketChannel(SocketChannel socket) {
  25. this(null, socket);
  26. }
  27. public NioSocketChannel(Channel parent, SocketChannel socket) {
  28. // 在父类中完成 非阻塞IO的配置,注册事件
  29. super(parent, socket);
  30. config = new NioSocketChannelConfig(this, socket.socket());
  31. }
  32. @Override
  33. protected SocketChannel javaChannel() {
  34. return (SocketChannel) super.javaChannel();
  35. }
  36. @Override
  37. public boolean isActive() {
  38. SocketChannel ch = javaChannel();
  39. return ch.isOpen() && ch.isConnected();
  40. }
  41. /**
  42. * 与远程服务器建立连接
  43. */
  44. @Override
  45. protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
  46. if (localAddress != null) {
  47. doBind0(localAddress);
  48. }
  49. boolean success = false;
  50. try {
  51. // 根据远程地址建立TCP连接,对连接结果进行判断
  52. boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
  53. if (!connected) {
  54. selectionKey().interestOps(SelectionKey.OP_CONNECT);
  55. }
  56. success = true;
  57. return connected;
  58. } finally {
  59. if (!success) {
  60. doClose();
  61. }
  62. }
  63. }
  64. /**
  65. * 关闭 Channel
  66. */
  67. @Override
  68. protected void doClose() throws Exception {
  69. super.doClose();
  70. javaChannel().close();
  71. }
  72. /**
  73. * 从 Channel 中读取数据
  74. */
  75. @Override
  76. protected int doReadBytes(ByteBuf byteBuf) throws Exception {
  77. final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
  78. allocHandle.attemptedBytesRead(byteBuf.writableBytes());
  79. return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
  80. }
  81. @Override
  82. protected int doWriteBytes(ByteBuf buf) throws Exception {
  83. final int expectedWrittenBytes = buf.readableBytes();
  84. return buf.readBytes(javaChannel(), expectedWrittenBytes);
  85. }
  86. /**
  87. * 向 Channel 中写数据
  88. */
  89. @Override
  90. protected void doWrite(ChannelOutboundBuffer in) throws Exception {
  91. SocketChannel ch = javaChannel();
  92. int writeSpinCount = config().getWriteSpinCount();
  93. do {
  94. if (in.isEmpty()) {
  95. // All written so clear OP_WRITE
  96. clearOpWrite();
  97. // Directly return here so incompleteWrite(...) is not called.
  98. return;
  99. }
  100. // Ensure the pending writes are made of ByteBufs only.
  101. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
  102. ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
  103. int nioBufferCnt = in.nioBufferCount();
  104. // Always us nioBuffers() to workaround data-corruption.
  105. // See https://github.com/netty/netty/issues/2761
  106. switch (nioBufferCnt) {
  107. case 0:
  108. // We have something else beside ByteBuffers to write so fallback to normal writes.
  109. writeSpinCount -= doWrite0(in);
  110. break;
  111. case 1: {
  112. // Only one ByteBuf so use non-gathering write
  113. // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
  114. // to check if the total size of all the buffers is non-zero.
  115. ByteBuffer buffer = nioBuffers[0];
  116. int attemptedBytes = buffer.remaining();
  117. final int localWrittenBytes = ch.write(buffer);
  118. if (localWrittenBytes <= 0) {
  119. incompleteWrite(true);
  120. return;
  121. }
  122. adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
  123. in.removeBytes(localWrittenBytes);
  124. --writeSpinCount;
  125. break;
  126. }
  127. default: {
  128. // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
  129. // to check if the total size of all the buffers is non-zero.
  130. // We limit the max amount to int above so cast is safe
  131. long attemptedBytes = in.nioBufferSize();
  132. final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
  133. if (localWrittenBytes <= 0) {
  134. incompleteWrite(true);
  135. return;
  136. }
  137. // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
  138. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
  139. maxBytesPerGatheringWrite);
  140. in.removeBytes(localWrittenBytes);
  141. --writeSpinCount;
  142. break;
  143. }
  144. }
  145. } while (writeSpinCount > 0);
  146. incompleteWrite(writeSpinCount < 0);
  147. }
  148. }

Unsafe 功能简介

Unsafe 接口 实际上是 Channel 接口 的辅助接口,它不应该被用户代码直接调用。实际的 IO 读写操作 都是由 Unsafe 接口 负责完成的

  1. public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
  2. interface Unsafe {
  3. /**
  4. * 返回绑定的 本地地址
  5. */
  6. SocketAddress localAddress();
  7. /**
  8. * 返回绑定的 远程地址
  9. */
  10. SocketAddress remoteAddress();
  11. /**
  12. * 将 Channel 注册到 EventLoop 上
  13. */
  14. void register(EventLoop eventLoop, ChannelPromise promise);
  15. /**
  16. * 绑定 本地地址 到 Channel 上
  17. */
  18. void bind(SocketAddress localAddress, ChannelPromise promise);
  19. /**
  20. * 连接到远程服务器
  21. */
  22. void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
  23. /**
  24. * 断开连接
  25. */
  26. void disconnect(ChannelPromise promise);
  27. /**
  28. * 关闭 Channel
  29. */
  30. void close(ChannelPromise promise);
  31. /**
  32. * 读就绪 网络事件
  33. */
  34. void beginRead();
  35. /**
  36. * 发送数据
  37. */
  38. void write(Object msg, ChannelPromise promise);
  39. /**
  40. * 将缓冲区的数据 刷到 Channel
  41. */
  42. void flush();
  43. }
  44. }

AbstractUnsafe

  1. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
  2. protected abstract class AbstractUnsafe implements Unsafe {
  3. /**
  4. * 将当前 Unsafe 对应的 Channel 注册到 EventLoop 的多路复用器上,
  5. * 然后调用 DefaultChannelPipeline 的 fireChannelRegistered()方法,
  6. * 如果 Channel 被激活 则调用 DefaultChannelPipeline 的 fireChannelActive()方法
  7. */
  8. @Override
  9. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  10. if (eventLoop == null) {
  11. throw new NullPointerException("eventLoop");
  12. }
  13. if (isRegistered()) {
  14. promise.setFailure(new IllegalStateException("registered to an event loop already"));
  15. return;
  16. }
  17. if (!isCompatible(eventLoop)) {
  18. promise.setFailure(
  19. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
  20. return;
  21. }
  22. AbstractChannel.this.eventLoop = eventLoop;
  23. if (eventLoop.inEventLoop()) {
  24. register0(promise);
  25. } else {
  26. try {
  27. eventLoop.execute(new Runnable() {
  28. @Override
  29. public void run() {
  30. register0(promise);
  31. }
  32. });
  33. } catch (Throwable t) {
  34. logger.warn(
  35. "Force-closing a channel whose registration task was not accepted by an event loop: {}",
  36. AbstractChannel.this, t);
  37. closeForcibly();
  38. closeFuture.setClosed();
  39. safeSetFailure(promise, t);
  40. }
  41. }
  42. }
  43. private void register0(ChannelPromise promise) {
  44. try {
  45. // check if the channel is still open as it could be closed in the mean time when the register
  46. // call was outside of the eventLoop
  47. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  48. return;
  49. }
  50. boolean firstRegistration = neverRegistered;
  51. doRegister();
  52. neverRegistered = false;
  53. registered = true;
  54. // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
  55. // user may already fire events through the pipeline in the ChannelFutureListener.
  56. pipeline.invokeHandlerAddedIfNeeded();
  57. safeSetSuccess(promise);
  58. pipeline.fireChannelRegistered();
  59. // Only fire a channelActive if the channel has never been registered. This prevents firing
  60. // multiple channel actives if the channel is deregistered and re-registered.
  61. if (isActive()) {
  62. if (firstRegistration) {
  63. pipeline.fireChannelActive();
  64. } else if (config().isAutoRead()) {
  65. // This channel was registered before and autoRead() is set. This means we need to begin read
  66. // again so that we process inbound data.
  67. //
  68. // See https://github.com/netty/netty/issues/4805
  69. beginRead();
  70. }
  71. }
  72. } catch (Throwable t) {
  73. // Close the channel directly to avoid FD leak.
  74. closeForcibly();
  75. closeFuture.setClosed();
  76. safeSetFailure(promise, t);
  77. }
  78. }
  79. /**
  80. * 绑定指定的端口,对于服务端 用于绑定监听端口,
  81. * 对于客户端,主要用于指定 客户端Channel 的本地绑定Socket地址。
  82. */
  83. @Override
  84. public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
  85. assertEventLoop();
  86. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  87. return;
  88. }
  89. // See: https://github.com/netty/netty/issues/576
  90. if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
  91. localAddress instanceof InetSocketAddress &&
  92. !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
  93. !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
  94. // Warn a user about the fact that a non-root user can't receive a
  95. // broadcast packet on *nix if the socket is bound on non-wildcard address.
  96. logger.warn(
  97. "A non-root user can't receive a broadcast packet if the socket " +
  98. "is not bound to a wildcard address; binding to a non-wildcard " +
  99. "address (" + localAddress + ") anyway as requested.");
  100. }
  101. boolean wasActive = isActive();
  102. try {
  103. doBind(localAddress);
  104. } catch (Throwable t) {
  105. safeSetFailure(promise, t);
  106. closeIfClosed();
  107. return;
  108. }
  109. if (!wasActive && isActive()) {
  110. invokeLater(new Runnable() {
  111. @Override
  112. public void run() {
  113. pipeline.fireChannelActive();
  114. }
  115. });
  116. }
  117. safeSetSuccess(promise);
  118. }
  119. /**
  120. * 客户端 或 服务端,主动关闭连接
  121. */
  122. @Override
  123. public final void disconnect(final ChannelPromise promise) {
  124. assertEventLoop();
  125. if (!promise.setUncancellable()) {
  126. return;
  127. }
  128. boolean wasActive = isActive();
  129. try {
  130. doDisconnect();
  131. } catch (Throwable t) {
  132. safeSetFailure(promise, t);
  133. closeIfClosed();
  134. return;
  135. }
  136. if (wasActive && !isActive()) {
  137. invokeLater(new Runnable() {
  138. @Override
  139. public void run() {
  140. pipeline.fireChannelInactive();
  141. }
  142. });
  143. }
  144. safeSetSuccess(promise);
  145. closeIfClosed(); // doDisconnect() might have closed the channel
  146. }
  147. /**
  148. * 在链路关闭之前需要首先判断是否处于刷新状态,如果处于刷新状态说明还有消息尚
  149. * 未发送出去,需要等到所有消息发送完成再关闭链路,因此,将关闭操作封装成Runnable稍后再执行
  150. */
  151. @Override
  152. public final void close(final ChannelPromise promise) {
  153. assertEventLoop();
  154. close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
  155. }
  156. /**
  157. * 本方法实际上将消息添加到环形发送数组中,并不是真正的写Channel
  158. */
  159. @Override
  160. public final void write(Object msg, ChannelPromise promise) {
  161. assertEventLoop();
  162. ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
  163. if (outboundBuffer == null) {
  164. // If the outboundBuffer is null we know the channel was closed and so
  165. // need to fail the future right away. If it is not null the handling of the rest
  166. // will be done in flush0()
  167. // See https://github.com/netty/netty/issues/2362
  168. safeSetFailure(promise, newWriteException(initialCloseCause));
  169. // release message now to prevent resource-leak
  170. ReferenceCountUtil.release(msg);
  171. return;
  172. }
  173. int size;
  174. try {
  175. msg = filterOutboundMessage(msg);
  176. size = pipeline.estimatorHandle().size(msg);
  177. if (size < 0) {
  178. size = 0;
  179. }
  180. } catch (Throwable t) {
  181. safeSetFailure(promise, t);
  182. ReferenceCountUtil.release(msg);
  183. return;
  184. }
  185. outboundBuffer.addMessage(msg, size, promise);
  186. }
  187. /**
  188. * 将缓冲区中待发送的消息全部写入 Channel,并发送给通信对方
  189. */
  190. @Override
  191. public final void flush() {
  192. assertEventLoop();
  193. ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
  194. if (outboundBuffer == null) {
  195. return;
  196. }
  197. outboundBuffer.addFlush();
  198. flush0();
  199. }
  200. @SuppressWarnings("deprecation")
  201. protected void flush0() {
  202. if (inFlush0) {
  203. // Avoid re-entrance
  204. return;
  205. }
  206. final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
  207. if (outboundBuffer == null || outboundBuffer.isEmpty()) {
  208. return;
  209. }
  210. inFlush0 = true;
  211. // Mark all pending write requests as failure if the channel is inactive.
  212. if (!isActive()) {
  213. try {
  214. if (isOpen()) {
  215. outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
  216. } else {
  217. // Do not trigger channelWritabilityChanged because the channel is closed already.
  218. outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false);
  219. }
  220. } finally {
  221. inFlush0 = false;
  222. }
  223. return;
  224. }
  225. try {
  226. doWrite(outboundBuffer);
  227. } catch (Throwable t) {
  228. if (t instanceof IOException && config().isAutoClose()) {
  229. /**
  230. * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
  231. * failing all flushed messages and also ensure the actual close of the underlying transport
  232. * will happen before the promises are notified.
  233. *
  234. * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
  235. * may still return {@code true} even if the channel should be closed as result of the exception.
  236. */
  237. initialCloseCause = t;
  238. close(voidPromise(), t, newFlush0Exception(t), false);
  239. } else {
  240. try {
  241. shutdownOutput(voidPromise(), t);
  242. } catch (Throwable t2) {
  243. initialCloseCause = t;
  244. close(voidPromise(), t2, newFlush0Exception(t), false);
  245. }
  246. }
  247. } finally {
  248. inFlush0 = false;
  249. }
  250. }
  251. }
  252. }

AbstractNioUnsafe

AbstractNioUnsafe 是 AbstractUnsafe 类 的 NIO 实现,它主要实现了 connect 、finishConnect 等方法。

  1. public abstract class AbstractNioChannel extends AbstractChannel {
  2. /**
  3. * 获取当前的连接状态进行缓存,然后发起连接操作。
  4. */
  5. @Override
  6. public final void connect(
  7. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  8. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  9. return;
  10. }
  11. try {
  12. if (connectPromise != null) {
  13. // Already a connect in process.
  14. throw new ConnectionPendingException();
  15. }
  16. boolean wasActive = isActive();
  17. if (doConnect(remoteAddress, localAddress)) {
  18. fulfillConnectPromise(promise, wasActive);
  19. } else {
  20. connectPromise = promise;
  21. requestedRemoteAddress = remoteAddress;
  22. // Schedule connect timeout.
  23. int connectTimeoutMillis = config().getConnectTimeoutMillis();
  24. if (connectTimeoutMillis > 0) {
  25. connectTimeoutFuture = eventLoop().schedule(new Runnable() {
  26. @Override
  27. public void run() {
  28. ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
  29. ConnectTimeoutException cause =
  30. new ConnectTimeoutException("connection timed out: " + remoteAddress);
  31. if (connectPromise != null && connectPromise.tryFailure(cause)) {
  32. close(voidPromise());
  33. }
  34. }
  35. }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
  36. }
  37. promise.addListener(new ChannelFutureListener() {
  38. @Override
  39. public void operationComplete(ChannelFuture future) throws Exception {
  40. if (future.isCancelled()) {
  41. if (connectTimeoutFuture != null) {
  42. connectTimeoutFuture.cancel(false);
  43. }
  44. connectPromise = null;
  45. close(voidPromise());
  46. }
  47. }
  48. });
  49. }
  50. } catch (Throwable t) {
  51. promise.tryFailure(annotateConnectException(t, remoteAddress));
  52. closeIfClosed();
  53. }
  54. }
  55. /**
  56. * 对 TCP三次握手连接结果 进行判断
  57. */
  58. @Override
  59. public final void finishConnect() {
  60. // Note this method is invoked by the event loop only if the connection attempt was
  61. // neither cancelled nor timed out.
  62. assert eventLoop().inEventLoop();
  63. try {
  64. boolean wasActive = isActive();
  65. doFinishConnect();
  66. fulfillConnectPromise(connectPromise, wasActive);
  67. } catch (Throwable t) {
  68. fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
  69. } finally {
  70. // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
  71. // See https://github.com/netty/netty/issues/1770
  72. if (connectTimeoutFuture != null) {
  73. connectTimeoutFuture.cancel(false);
  74. }
  75. connectPromise = null;
  76. }
  77. }
  78. }
  79. }