gRPC源码分析2-Server的建立

时间:2021-01-03 17:01:51

gRPC中,Server、Client共享的Class不是很多,所以我们可以单独的分别讲解Server和Client的源码。

 

通过第一篇,我们知道对于gRPC来说,建立Server是非常简单的,还记得怎么写的?还是以example里 HelloWorldServer 例子来看

server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();

 

你没有看错,就是这么几行搞定。

 

如果需要看懂gRPC的源码,首先有几点需要明白

  • Builder模式生成Entity

  • Provider(SPI)模式解耦,动态选择服务提供方

  • abstract class用于扩展

 

0. 流程图

gRPC源码分析2-Server的建立

1. Builder

gRPC源码分析2-Server的建立

ServerBuilder是一个抽象类,不同的服务提供方(Provider),将继承实现它。如何找到这些继承者呢?ServerProvider就是用来找到不同的provider的。

 

2. Provider

gRPC源码分析2-Server的建立

 

如上图,ServerProvider也是一个抽象类,实现者都有哪些呢?我们通过SPI模式找到他们。

 

通过搜索文件知道gRPC中 io.grpc.ServerProvider 的实现方只有:Netty

gRPC源码分析2-Server的建立

 

io.grpc.netty.NettyServerProvider,
这个类就是ServerProvider的实现者,它的builderForPort返回ServerBuilder

gRPC源码分析2-Server的建立

3. NettyServer

最后,我们来看下当链接建立时是如何创建handle的。

public void initChannel(Channel ch) throws Exception {
eventLoopReferenceCounter.retain();
ch.closeFuture().addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
eventLoopReferenceCounter.release();
}
});
NettyServerTransport transport
= new NettyServerTransport(ch, protocolNegotiator,
maxStreamsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize);
ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) {
if (channel != null && !channel.isOpen()) {
// Server already shutdown.
ch.close();
return;
}

transportListener
= listener.transportCreated(transport);
}
transport.start(transportListener);
}

看code可知,当一个链接建立时,会生成一个NettyServerTransport,所有的数据处理都将在这里实现。

 

4. NettyServerTransport

public void start(ServerTransportListener listener) {
Preconditions.checkState(
this.listener == null, "Handler already registered");
this.listener = listener;

// Create the Netty handler for the pipeline.
final NettyServerHandler grpcHandler = createHandler(listener);
HandlerSettings.setAutoWindow(grpcHandler);

// Notify when the channel closes.
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyTerminated(grpcHandler.connectionError());
}
});

ChannelHandler negotiationHandler
= protocolNegotiator.newHandler(grpcHandler);
channel.pipeline().addLast(negotiationHandler);
}

我们看到当调用start方法是,最重要的就是createHandle,在这个方法里将看到如何绑定HTTP/2的处理器的。

 

5. NettyServerHandle

static NettyServerHandler newHandler(ServerTransportListener transportListener,
int maxStreams,
int flowControlWindow,
int maxHeaderListSize,
int maxMessageSize) {
Preconditions.checkArgument(maxHeaderListSize
> 0, "maxHeaderListSize must be positive");
// 就是一个log
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
Http2HeadersDecoder headersDecoder
= new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
// reader
Http2FrameReader frameReader = new Http2InboundFrameLogger(
new DefaultHttp2FrameReader(headersDecoder), frameLogger);
// writer
Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
return newHandler(frameReader, frameWriter, transportListener, maxStreams, flowControlWindow,
maxMessageSize);
}

@VisibleForTesting
static NettyServerHandler newHandler(Http2FrameReader frameReader, Http2FrameWriter frameWriter,
ServerTransportListener transportListener,
int maxStreams,
int flowControlWindow,
int maxMessageSize) {
Preconditions.checkArgument(maxStreams
> 0, "maxStreams must be positive");
Preconditions.checkArgument(flowControlWindow
> 0, "flowControlWindow must be positive");
Preconditions.checkArgument(maxMessageSize
> 0, "maxMessageSize must be positive");
// 一个channel一个connection
Http2Connection connection = new DefaultHttp2Connection(true);

// Create the local flow controller configured to auto-refill the connection window.
connection.local().flowController(
new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));


Http2ConnectionEncoder encoder
= new DefaultHttp2ConnectionEncoder(connection, frameWriter);
Http2ConnectionDecoder decoder
= new DefaultHttp2ConnectionDecoder(connection, encoder,
frameReader);

Http2Settings settings
= new Http2Settings();
settings.initialWindowSize(flowControlWindow);
settings.maxConcurrentStreams(maxStreams);

return new NettyServerHandler(transportListener, decoder, encoder, settings, maxMessageSize);
}

gRPC源码分析2-Server的建立