我是怎样阅读 Netty Channel 源码的

arctan0 2019-06-30

Channel 功能说明

我在使用 ServerBootstrap 来创建服务的时候通过 channel(NioServerSocketChannel.class) 来设置 Channel, 那么 Channel 的主要作用是什么呢?

在分析 Channel 之前, 需要先弄懂 NioServerSocketChannel 类, 它的功能对于 JDK NIO 类库中的
ServerSocketChannel 类, 它是用来监听TCP连接的(并不管读写).

NioServerSocketChannel 最主要就是实现 Channel 接口. Channel 接口, 采用 Facade 模式进行统一封装, 将网络的读、写, 客户端发起连接, 主动关闭连接, 链路关闭, 获取通信双方的网络地址等.

Channel 接口下有一个重要的抽象类 AbstractChannel, 一些公共的基础方法都在这个抽象类中实现, 而特定一些功能, 都可以通过各个不同的实现类去实现, 这样最大程度的实现功能和接口的重用.

为什么不实用 JDK NIO 原生的 Channel 而要另起炉灶呢?

  1. JDK SocketChannelServerSocketChannel 没有统一的 Channel.
  2. JDK 的 SocketChannelServerSocketChannel 的主要职责就是网络 IO 操作, 由于他们的 SPI 类接口, 由具体的虚拟机厂家提供, 所以通过继承 SPI 功能类来扩展其功能的难度很大; SocketChannelServerSocketChannel 抽象类, 其工作量和重新开发一个新的 Channel 功能类是差不多的.
  3. Netty 的 Channel 需要能够跟 Netty 的整体架构融合在一起, 例如 IO 模型、基于 ChannelPipeline 的定制模型, 以及基于元数据描述配置化的 TCP 参数等, 这些 JDK 的 SocketChannelServerSocketChannel 都没有提供, 需要重新封装.
  4. 自定义的 Channel, 功能实现更加灵活.

Channel 方法介绍

我是怎样阅读 Netty Channel 源码的

Channel read(): 从当前的 Channel 中读取数据到第一个 inbound 缓冲区中, 如果数据被成功读取, 触发 ChannelHandler.channelRead(ChannelHandlerContext, Object) 事件.

读取操作 API 调用完成之后, 紧接着会触发 ChannelHandler.channelReadComplete(ChannelHandlerContext) 事件, 这样业务的 ChannelHandler 可以决定是否需要继承读取数据. 如果已经有读操作请求被挂起, 则后续的读操作会被忽略.

ChannelFuture write(Object): 请求将当前的 msg 通过 ChannelPipeline 写入到目标 Channel 中. 注意, write 操作只是将消息存入到消息发送环数组中, 并没有真正被发送, 只有调用 flush 操作才会被写入到 Channel 中, 发送给对方.

ChannelFuture write(Object, ChannelPromise): 功能与 Channel read() 相同, 但是携带了 ChannelPromise 参数负责设置写入操作的结果.

ChannelFuture writeAndFlush(Objec, ChannelPromise): 与上面方法功能类似, 不同之处在于它会将消息写入 Channel 中发送, 等价于单独调用 writeflush 操作的组合.

Channel flush(): 将之前写入到发送环形中的消息全部写入到目标 Channel 中, 发送给通信对方.

ChannelFuture close(ChannelPromise): 主动关闭当前连接, 通过 ChannelPromise 设置操作结果并执行结果通知, 无论操作是否成功, 都可以通过 ChannelPromise 获取操作结果. 该操作会级联触发 ChannelPipeline 中所有 ChannelHandlerChannelHandler.close(ChannelHandlerContext, ChannelPromise) 事件.

ChannelFuture disconnect(ChannelPromise): 请求断开与远程通信对端的连接并使用 ChannelPromise 来获取操作结果的通知消息. 该方法会级联触发 ChannelHandler.disconnect(ChannelHandlerContext, ChannelPromise) 事件.

ChannelFuture connect(SocketAddress remoteAddress): 客户端使用指定的服务端地址 remoteAddress 发起连接请求, 如果连接因为应答超时而失败, ChannelFuture 中的操作结果就是 ConnectTimeoutException 异常; 如果连接被拒绝, 操作结果为 ConnectException. 该方法会级联触发 ChannelHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise) 事件.

ChannelFuture bind(SocketAddress localAddress): 绑定指定的本地 Socket 地址 localAddress, 该方法会级联触发 ChannelHandler.bind(ChannelHandlerContext, SocketAddress, ChannelPromise) 事件.

ChannelConfig config(): 获取当前 Channel 的配置信息, 例如 CONNECT_TIMEOUT_MILLIS.

boolean isOpen(): 判断当前 Channel 是否已经打开.

boolean isRegistered(): 判断当前 Channel 是否已经注册到 EventLoop 上.

boolean isActive(): 判断当前 Channel 是否已经处于激活状态.

ChannelMetadata metadata(): 获取当前 Channel 的元数据描述信息, 包括 TCP 参数配置等.

SocketAddress localAddress(): 获取当前 Channel 的本地绑定地址.

SocketAddress remoteAddress(): 获取当前 Channel 通信的远程 Socket 地址.

eventLoop(): Channel 需要注册到 EventLoop 的多路复用器上, 用于处理 IO 事件, 通过 eventLoop() 方法可以获取到 Channel 上注册的 EventLoop. EventLoop 本质上就是处理网络读写事件的 Reactor 线程. 也可以用来执行定时任务和用户自定义 NioTask 等任务.

id(): 它返回 ChannelId 对象, 是 Channel 的唯一标识, 它的可能生成策略如下:

  1. 机器的 MAC 地址(EUI-48 或 EUI-64) 等可以代表全局唯一的信息;
  2. 当前的进程 ID;
  3. 当前系统的毫秒-- System.currentTimeMillis();
  4. 当前系统的纳秒-- System.nanoTime();
  5. 32 位的随机整数;
  6. 32 位自增的序列数.

AbstractChannel 源码分析

AstractChannel 聚合了所有 Channel 使用到的对象, 由 AbstractChannel 提供初始化和统一封装, 如果功能和具体子类相关, 则定义成抽象方法由子类具体实现.

static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();

static {
    CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
    NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}

private MessageSizeEstimator.Handle estimatorHandle;

private final Channel parent;
private final ChannelId id = DefaultChannelId.newInstance();
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);

....
protected AbstractChannel(Channel parent, EventLoop eventLoop) {
    this.parent = parent;
    this.eventLoop = validate(eventLoop);
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

每一个 Channel 中包含一个 ChannelPipeline. 在 AbstractChannel 的构造方法中初始化. AbstractChannel 也提供了一些公共 API 的具体实现, 例如 localAddress()remoteAddress().

当 Channel 进行 IO 操作时会产生对应的 IO 事件, 然后驱动事件在 ChannelPipeline 中传播, 由对应的 ChannelHandler 对事件进行拦截和处理, 不关心的事件可以直接忽略.

网络 IO 操作直接调用 DefaultChannelPipeline 的相关方法, 由 DefaultChannelPipeline 中对应的 ChannelHandler 进行具体逻辑的处理.

相关推荐