服务端一般使用 2 个 Group, bossGroup 和 workGroup。bossGroup 是 mainReactor, workGroup 是 subReactor,数量与可用 CPU 相等。业务 decode-compute-encode 有专门的线程池处理。
客户端一般使用 EventLoopGroup 启动。
一个 Bootstrap 的启动,只能发起对一个远程的地址。所以只会使用一个 NIO Selector ,也就是说仅使用一个 Reactor 。即使,我们在声明使用一个 EventLoopGroup ,该 EventLoopGroup 也只会分配一个 EventLoop 对 IO 事件进行处理。
AbstractBootstrap#bind(int port)
AbstractBootstrap#doBind(SocketAddress)
ChannelFuture <- AbstractBootstrap#initAndRegister
io.netty.channel.NioServerSocketChannel <- ChannelFactory#newChannel
// 初始化 Channel 属性, 从 Channel 获取 ChannelPipeline
AbstractBootstrap#init(io.netty.channel.Channel)
// 注册 Channel 到 EventLoopGroup
ChannelFuture <- ChannelFuture <- EventLoopGroup#register(Channel)
MultithreadEventLoopGroup#next#register(Channel)
// 这里会把 Channel 包装成 DeafaultChannelPromise(ChannelFuture 实现)
ChannelFuture <- SingleThreadEventLoop#register(Channel)
// 反过来在 Channel.Unsafe#register(EventLoop, ChannelPromise);
AbstractChannel$AbstractUnsafe#register(EventLoop, ChannelPromise)
AbstractChannel$AbstractUnsafe#register0(ChannelPromise)
这里, 如果之前步骤中 Channel 注册完成, 直接执行 doBind0
反之,往 ChannelFuture 注册 bind 回调 doBind0
功用就是 bind localAdrress 开始接受 socket 请求,同时再注册失败回调
AbstractChannel$AbstractUnsafe#register0(ChannelPromise)
// 注册 EventLoop.Selector\\attach Channel 到 jdk
// 调用 java.nio.channels.spi.
// AbstractSelectableChannel.register(Selector, int,Object att)
// attachment 是 Netty io.netty.channel.nio.AbstractNioChannel。
AbstractNioChannel#doRegister
ChannelPipeline#invokeHandlerAddedIfNeeded
// 触发 ChannelPromise 的 Listener, 在
AbstractChannel#safeSetSuccess(ChannelPromise)
DefaultChannelPipeline#fireChannelRegistered
AbstractChannelHandlerContext#invokeChannelRegistered(head AbstractChannelHandlerContext)
AbstractChannelHandlerContext#invokeChannelRegistered
invokeHandler
AbstractChannelHandlerContext$HeadContext(ChannelInboundHandler)
#channelRegistered(AbstractChannelHandlerContext)
// firstRegistration 则调用 callHandlerAddedForAllHandlers
// 执行 PendingHandlerCallback
invokeHandlerAddedIfNeeded
ChannelHandlerContext#channelRegistered
// 通知注册成功
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
ChannelPipeline#bind(SocketAddress,ChannelPromise) -> ChannelFuture
ChannelPipeline.tail.bind(SocketAddress,ChannelPromise) -> ChannelFuture
ChannelPipeline$HeadContext.bind(ChannelHandlerContext,SocketAddress,ChannelPromise)
unsafe.bind(SocketAddress,ChannelPromise)
ChannelPipeline#fireChannelActive
AbstractChannelHandlerContext#invokeChannelActive(head)
AbstractChannelHandlerContext$HeadContext#fireChannelActive
AbstractChannelHandlerContext$HeadContext#readIfIsAutoRead
AbstractChannel#read
DefaultChannelPipeline.tail#read
AbstractChannel$AbstractUnsafe#read
// 这里会把之前注册 0, 替换成 SelectionKey.OP_ACCEPT(服务端)
// 或者 SelectionKey.OP_CONNECT (客户端)
AbstractNioMessageChannel#doBeginRead
AbstractBootstrap#init(Channel)
服务端启动,由于是多 Reactor 模式,这里是在 mainReactor 中添加一个 ServerBootstrapAcceptor。
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//为 Channel 添加 Acceptor
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
ServerBootstrapAcceptor 也是一个 ChannelInboundHandler,它会在 unsafe 读取完成后被触发,由于主 Reactor 只负责 dispatch, 因此这个 Acceptor 就是把任务 dispatch 给 childEventLoopGroup。
childEventLoopGroup 里面会有很多 EventLoop 处理读写。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
AbstractNioChannel#doRegister → AbstractSelectableChannel#register