服务端模式 Reactor 模型

服务端一般使用 2 个 Group, bossGroup 和 workGroup。bossGroup 是 mainReactor, workGroup 是 subReactor,数量与可用 CPU 相等。业务 decode-compute-encode 有专门的线程池处理。

客户端模式 Reactor 模型

客户端一般使用 EventLoopGroup 启动。

一个 Bootstrap 的启动,只能发起对一个远程的地址。所以只会使用一个 NIO Selector ,也就是说仅使用一个 Reactor 。即使,我们在声明使用一个 EventLoopGroup ,该 EventLoopGroup 也只会分配一个 EventLoop 对 IO 事件进行处理。

启动

AbstractBootstrap#bind(int port)
AbstractBootstrap#doBind(SocketAddress)
  ChannelFuture <- AbstractBootstrap#initAndRegister
    io.netty.channel.NioServerSocketChannel <- ChannelFactory#newChannel
    // 初始化 Channel 属性, 从 Channel 获取 ChannelPipeline 
    AbstractBootstrap#init(io.netty.channel.Channel)
    // 注册 Channel 到 EventLoopGroup
    ChannelFuture <- ChannelFuture <- EventLoopGroup#register(Channel)
      MultithreadEventLoopGroup#next#register(Channel)
      // 这里会把 Channel 包装成 DeafaultChannelPromise(ChannelFuture 实现)
      ChannelFuture <- SingleThreadEventLoop#register(Channel)
         // 反过来在 Channel.Unsafe#register(EventLoop, ChannelPromise);
         AbstractChannel$AbstractUnsafe#register(EventLoop, ChannelPromise)
         AbstractChannel$AbstractUnsafe#register0(ChannelPromise)
   这里, 如果之前步骤中 Channel 注册完成, 直接执行 doBind0
   反之,往 ChannelFuture 注册 bind 回调 doBind0
   功用就是 bind localAdrress 开始接受 socket 请求,同时再注册失败回调   
         
AbstractChannel$AbstractUnsafe#register0(ChannelPromise)
  // 注册 EventLoop.Selector\\attach Channel 到 jdk
  // 调用 java.nio.channels.spi.
  // AbstractSelectableChannel.register(Selector, int,Object att)
  // attachment 是 Netty io.netty.channel.nio.AbstractNioChannel。
  AbstractNioChannel#doRegister
  ChannelPipeline#invokeHandlerAddedIfNeeded
  // 触发 ChannelPromise 的 Listener, 在 
  AbstractChannel#safeSetSuccess(ChannelPromise)
  DefaultChannelPipeline#fireChannelRegistered
  AbstractChannelHandlerContext#invokeChannelRegistered(head AbstractChannelHandlerContext)
  AbstractChannelHandlerContext#invokeChannelRegistered
    invokeHandler
      AbstractChannelHandlerContext$HeadContext(ChannelInboundHandler)
      #channelRegistered(AbstractChannelHandlerContext)
        // firstRegistration 则调用 callHandlerAddedForAllHandlers
        // 执行 PendingHandlerCallback 
        invokeHandlerAddedIfNeeded
        ChannelHandlerContext#channelRegistered
          // 通知注册成功
          invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));

ChannelPipeline#bind(SocketAddress,ChannelPromise) -> ChannelFuture
  ChannelPipeline.tail.bind(SocketAddress,ChannelPromise) -> ChannelFuture
  ChannelPipeline$HeadContext.bind(ChannelHandlerContext,SocketAddress,ChannelPromise)
    unsafe.bind(SocketAddress,ChannelPromise)
      ChannelPipeline#fireChannelActive
        AbstractChannelHandlerContext#invokeChannelActive(head)
        AbstractChannelHandlerContext$HeadContext#fireChannelActive
        AbstractChannelHandlerContext$HeadContext#readIfIsAutoRead
          AbstractChannel#read
            DefaultChannelPipeline.tail#read
              AbstractChannel$AbstractUnsafe#read
                // 这里会把之前注册 0, 替换成 SelectionKey.OP_ACCEPT(服务端) 
                // 或者 SelectionKey.OP_CONNECT (客户端)
                AbstractNioMessageChannel#doBeginRead

AbstractBootstrap#init(Channel)

服务端启动,由于是多 Reactor 模式,这里是在 mainReactor 中添加一个 ServerBootstrapAcceptor。

setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
        childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

//为 Channel 添加 Acceptor
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                  ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

ServerBootstrapAcceptor 也是一个 ChannelInboundHandler,它会在 unsafe 读取完成后被触发,由于主 Reactor 只负责 dispatch, 因此这个 Acceptor 就是把任务 dispatch 给 childEventLoopGroup。

childEventLoopGroup 里面会有很多 EventLoop 处理读写。

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

AbstractNioChannel#doRegister → AbstractSelectableChannel#register