启动
forPort(port) →ServerProvider → NettyServerProvider → NettyServerBuilder
Server ← NettyServerBuilder.build → buildTransportServers
gRPC mainReactor (boss worker group) 线程数为 1, subReactor(worker group) 数量等于最大可用 CPU 数量。
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
@Override
public ServerImpl start() throws IOException {
synchronized (lock) {
checkState(!started, "Already started");
checkState(!shutdown, "Shutting down");
// Start and wait for any ports to actually be bound.
ServerListenerImpl listener = new ServerListenerImpl();
for (InternalServer ts : transportServers) {
ts.start(listener);
activeTransportServers++;
}
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
return this;
}
}
NettyServer#start
NettyServer 是 grpc Server 的资源组合器,实际初始化由 NettyServerTransport 完成
GrpcHttp2ConnectionHandler (NettyServerHandler/NettyClientHandler)
ProtocolNegotiator 协议协商, 明文 http2, 密文 http2, 所有实现都在 io.grpc.netty.shaded.io.grpc.netty.ProtocolNegotiators
WriteBufferingAndExceptionHandler 写优化
NettyServerTransport transport = new NettyServerTransport(
ch, channelDone, NettyServer.this.protocolNegotiator,
NettyServer.this.streamTracerFactories, NettyServer.this.transportTracerFactory.create(),
NettyServer.this.maxStreamsPerConnection, NettyServer.this.flowControlWindow,
NettyServer.this.maxMessageSize, NettyServer.this.maxHeaderListSize,
NettyServer.this.keepAliveTimeInNanos, NettyServer.this.keepAliveTimeoutInNanos,
NettyServer.this.maxConnectionIdleInNanos, maxConnectionAgeInNanos,
NettyServer.this.maxConnectionAgeGraceInNanos, NettyServer.this.permitKeepAliveWithoutCalls,
NettyServer.this.permitKeepAliveTimeInNanos);
transport.start(transportListener);
NettyServerTransport#start
public void start(ServerTransportListener listener) {
Preconditions.checkState(this.listener == null, "Handler already registered");
this.listener = listener;
this.grpcHandler = this.createHandler(listener, this.channelUnused);
NettyHandlerSettings.setAutoWindow(this.grpcHandler);
ChannelHandler negotiationHandler = this.protocolNegotiator.newHandler(this.grpcHandler);
ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
final class TerminationNotifier implements ChannelFutureListener {
boolean done;
TerminationNotifier() {
}
public void operationComplete(ChannelFuture future) throws Exception {
if (!this.done) {
this.done = true;
NettyServerTransport.this.notifyTerminated(NettyServerTransport.this.grpcHandler.connectionError());
}
}
}
// 注册 Handler 到 Netty Channel
ChannelFutureListener terminationNotifier = new TerminationNotifier();
this.channelUnused.addListener(terminationNotifier);
this.channel.closeFuture().addListener(terminationNotifier);
this.channel.pipeline().addLast(new ChannelHandler[]{bufferingHandler});
}
WriteBufferingAndExceptionHandler extends ChannelDuplexHandler
ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler
ChannelDuplexHandler 是 gRPC 基于 Netty ChannelInboundHandler 和 ChannelOutboundHandler 的包装类,支持写和读,它和 Netty Adapter 实现类似,只是把写入操作委托给 ChannelHandlerContext。/
// 属性
private static final Logger logger = Logger.getLogger(WriteBufferingAndExceptionHandler.class.getName());
private final Queue<WriteBufferingAndExceptionHandler.ChannelWrite> bufferedWrites = new ArrayDeque();
private final ChannelHandler next;
private boolean writing;
private boolean flushRequested;
private Throwable failCause;
WriteBufferingAndExceptionHandler 方法实现
// 确保自己是最后一个?
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().addBefore(ctx.name(), null, next);
super.handlerAdded(ctx);
// kick off protocol negotiation.
ctx.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
}
// Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
// called, or we have somehow failed. If we have already failed in the past, then the write
// will fail immediately.
// 写入时并不立刻刷到网卡
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (failCause != null) {
promise.setFailure(failCause);
ReferenceCountUtil.release(msg);
} else {
bufferedWrites.add(new ChannelWrite(msg, promise));
}
}
/**
* Calls to this method will not trigger an immediate flush. The flush will be deferred until
* {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
*/
@Override
public void flush(ChannelHandlerContext ctx) {
/**
* Swallowing any flushes is not only an optimization but also required
* for the SslHandler to work correctly. If the SslHandler receives multiple
* flushes while the handshake is still ongoing, then the handshake "randomly"
* times out. Not sure at this point why this is happening. Doing a single flush
* seems to work but multiple flushes don't ...
*/
flushRequested = true;
}
@SuppressWarnings("FutureReturnValueIgnored")
final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
// TODO(carl-mastrangelo): remove the isActive check and just fail if not yet ready.
if (!ctx.channel().isActive() || writing) {
return;
}
// Make sure that method can't be reentered, so that the ordering
// in the queue can't be messed up.
writing = true;
while (!bufferedWrites.isEmpty()) {
ChannelWrite write = bufferedWrites.poll();
ctx.write(write.msg, write.promise);
}
if (flushRequested) {
ctx.flush();
}
// Removal has to happen last as the above writes will likely trigger
// new writes that have to be added to the end of queue in order to not
// mess up the ordering.
ctx.pipeline().remove(this);
}
// 不处理读,出问题是打印 dump 信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (logger.isLoggable(Level.FINE)) {
Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg;
logger.log(
Level.FINE,
"Unexpected channelRead()->{0} reached end of pipeline {1}",
new Object[] {loggedMsg, ctx.pipeline().names()});
}
exceptionCaught(
ctx,
Status.INTERNAL.withDescription(
"channelRead() missed by ProtocolNegotiator handler: " + msg)
.asRuntimeException());
} finally {
ReferenceCountUtil.safeRelease(msg);
}
}
io.grpc.netty.ProtocolNegotiators 主要提供了以下实现
serverPlaintext() -> PlaintextProtocolNegotiator
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelHandler grpcNegotiationHandler = new GrpcNegotiationHandler(grpcHandler);
ChannelHandler activeHandler = new WaitUntilActiveHandler(grpcNegotiationHandler);
return activeHandler;
}
serverTls(SslContext) ->
httpProxy(SocketAddress, String proxyUsername, String proxyPassword,
ProtocolNegotiator negotiator) -> ProxyProtocolNegotiationHandler
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
ChannelHandler gnh = new GrpcNegotiationHandler(handler);
ChannelHandler sth = new ServerTlsHandler(gnh, sslContext);
return new WaitUntilActiveHandler(sth);
}
plaintextUpgrade()-> PlaintextUpgradeProtocolNegotiator
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelHandler upgradeHandler =
new Http2UpgradeAndGrpcHandler(grpcHandler.getAuthority(), grpcHandler);
return new WaitUntilActiveHandler(upgradeHandler);
}
WaitUntilActiveHandler
Waits for the channel to be active, and then installs the next Handler.
Using this allows subsequent handlers to assume the channel is active and ready to send.
Additionally, this a {@link ProtocolNegotiationEvent}, with the connection addresses.
Http2UpgradeAndGrpcHandler
Act as a combination of Http2Upgrade and {@link GrpcNegotiationHandler}. Unfortunately,
this negotiator doesn't follow the pattern of "just one handler doing negotiation at a time."
This is due to the tight coupling between the upgrade handler and the HTTP/2 handler.