spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解

时间:2024-10-04 13:59:59

 

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》
  6. spark2.1.0之源码分析——RPC传输管道处理器详解

在《spark2.1.0之源码分析——RPC传输管道处理器详解》一文中详细介绍了TransportRequestHandler。

由于TransportRequestHandler实际是把请求消息交给RpcHandler进一步处理的,所以这里对RpcHandler首先做个介绍。RpcHandler是一个抽象类,定义了一些RPC处理器的规范,其主要实现见代码清单1。

代码清单1         RpcHandler的实现

  1. public abstract class RpcHandler {
  2. private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
  3. public abstract void receive(
  4. TransportClient client,
  5. ByteBuffer message,
  6. RpcResponseCallback callback);
  7. public abstract StreamManager getStreamManager();
  8. public void receive(TransportClient client, ByteBuffer message) {
  9. receive(client, message, ONE_WAY_CALLBACK);
  10. }
  11. public void channelActive(TransportClient client) { }
  12. public void channelInactive(TransportClient client) { }
  13. public void exceptionCaught(Throwable cause, TransportClient client) { }
  14. private static class OneWayRpcCallback implements RpcResponseCallback {
  15. private static final Logger logger = ();
  16. @Override
  17. public void onSuccess(ByteBuffer response) {
  18. ("Response provided for one-way RPC.");
  19. }
  20. @Override
  21. public void onFailure(Throwable e) {
  22. ("Error response provided for one-way RPC.", e);
  23. }
  24. }
  25. }

代码清单1中RpcHandler的各个方法的作用如下:

  • receive:这是一个抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现。receive接收三个参数,分别是TransportClientByteBufferRpcResponseCallbackRpcResponseCallback用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback都会被调用一次。RpcResponseCallback的接口定义如下:
  1. public interface RpcResponseCallback {
  2. void onSuccess(ByteBuffer response);
  3. void onFailure(Throwable e);
  4. }
  • 重载的receive:只接收TransportClientByteBuffer两个参数,RpcResponseCallback为默认的ONE_WAY_CALLBACK,其类型为OneWayRpcCallback,从代码清单3-27中OneWayRpcCallback的实现可以看出其onSuccessonFailure只是打印日志,并没有针对客户端做回复处理。
  • channelActive:当与给定客户端相关联的channel处于活动状态时调用。
  • channelInactive:当与给定客户端相关联的channel处于非活动状态时调用。
  • exceptionCaught:当channel产生异常时调用。
  • getStreamManager:获取StreamManager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态。、

介绍完RpcHandler,现在回到TransportRequestHandler的处理过程。TransportRequestHandler处理以上四种RequestMessage的实现见代码清单2。

代码清单2         TransportRequestHandler的handle方法

  1. @Override
  2. public void handle(RequestMessage request) {
  3. if (request instanceof ChunkFetchRequest) {
  4. processFetchRequest((ChunkFetchRequest) request);
  5. } else if (request instanceof RpcRequest) {
  6. processRpcRequest((RpcRequest) request);
  7. } else if (request instanceof OneWayMessage) {
  8. processOneWayMessage((OneWayMessage) request);
  9. } else if (request instanceof StreamRequest) {
  10. processStreamRequest((StreamRequest) request);
  11. } else {
  12. throw new IllegalArgumentException("Unknown request type: " + request);
  13. }
  14. }

结合代码清单2,下面逐一详细分析这四种类型请求的处理过程。

处理块获取请求

         processFetchRequest方法用于处理ChunkFetchRequest类型的消息,其实现见代码清单3。

代码清单3         processFetchRequest的实现

  1. private void processFetchRequest(final ChunkFetchRequest req) {
  2. if (()) {
  3. ("Received req from {} to fetch block {}", getRemoteAddress(channel),
  4. );
  5. }
  6. ManagedBuffer buf;
  7. try {
  8. (reverseClient, );
  9. (channel, );
  10. buf = (, );
  11. } catch (Exception e) {
  12. (("Error opening block %s for request from %s",
  13. , getRemoteAddress(channel)), e);
  14. respond(new ChunkFetchFailure(, (e)));
  15. return;
  16. }
  17. respond(new ChunkFetchSuccess(, buf));
  18. }

代码清单3中的streamManager是通过调用RpcHandler的getStreamManager方法获取的StreamManager。processFetchRequest的处理都依托于RpcHandler的StreamManager,其处理步骤如下:

  1. 调用StreamManager的checkAuthorization方法,校验客户端是否有权限从给定的流中读取;
  2. 调用StreamManager的registerChannel方法,将一个流和一条(只能是一条)客户端的TCP连接关联起来,这可以保证对于单个的流只会有一个客户端读取。流关闭之后就永远不能够重用了;
  3. 调用StreamManager的getChunk方法,获取单个的块(块被封装为ManagedBuffer)。由于单个的流只能与单个的TCP连接相关联,因此getChunk方法不能为了某个特殊的流而并行调用;
  4. 将ManagedBuffer和流的块Id封装为ChunkFetchSuccess后,调用respond方法返回给客户端。

有关StreamManager的具体实现,读者可以参考《Spark内核设计的艺术 架构设计与实现》一书5.3.5节介绍的NettyStreamManager和6.9.2节介绍的NettyBlockRpcServer中的OneForOneStreamManager。

处理RPC请求

         processRpcRequest方法用于处理RpcRequest类型的消息,其实现见代码清单4。

代码清单4         processRpcRequest的实现

  1. private void processRpcRequest(final RpcRequest req) {
  2. try {
  3. (reverseClient, ().nioByteBuffer(), new RpcResponseCallback() {
  4. @Override
  5. public void onSuccess(ByteBuffer response) {
  6. respond(new RpcResponse(, new NioManagedBuffer(response)));
  7. }
  8. @Override
  9. public void onFailure(Throwable e) {
  10. respond(new RpcFailure(, (e)));
  11. }
  12. });
  13. } catch (Exception e) {
  14. ("Error while invoking RpcHandler#receive() on RPC id " + , e);
  15. respond(new RpcFailure(, (e)));
  16. } finally {
  17. ().release();
  18. }
  19. }

代码清单4中将RpcRequest消息的内容体、发送消息的客户端以及一个RpcResponseCallback类型的匿名内部类作为参数传递给了RpcHandler的receive方法。这就是说真正用于处理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。由于RpcHandler是抽象类(见代码清单1),其receive方法也是抽象方法,所以具体的操作将由RpcHandler的实现了receive方法的子类来完成。所有继承RpcHandler的子类都需要在其receive方法的具体实现中回调RpcResponseCallback的onSuccess(处理成功时)或者onFailure(处理失败时)方法。从RpcResponseCallback的实现来看,无论处理结果成功还是失败,都将调用respond方法对客户端进行响应。

处理流请求

         processStreamRequest方法用于处理StreamRequest类型的消息,其实现见代码清单5。

代码清单5         processStreamRequest的实现

  1. private void processStreamRequest(final StreamRequest req) {
  2. ManagedBuffer buf;
  3. try {
  4. buf = ();// 将获取到的流数据封装为ManagedBuffer
  5. } catch (Exception e) {
  6. ((
  7. "Error opening stream %s for request from %s", , getRemoteAddress(channel)), e);
  8. respond(new StreamFailure(, (e)));
  9. return;
  10. }
  11. if (buf != null) {
  12. respond(new StreamResponse(, (), buf));
  13. } else {
  14. respond(new StreamFailure(, (
  15. "Stream '%s' was not found.", )));
  16. }
  17. }

代码清单5中也使用了RpcHandler的StreamManager,其处理步骤如下:

  1. 调用StreamManager的openStream方法将获取到的流数据封装为ManagedBuffer;
  2. 当成功或失败时调用respond方法向客户端响应。

处理无需回复的RPC请求

         processOneWayMessage方法用于处理StreamRequest类型的消息,其实现见代码清单6。

代码清单6         processOneWayMessage的实现

  1. private void processOneWayMessage(OneWayMessage req) {
  2. try {
  3. (reverseClient, ().nioByteBuffer());
  4. } catch (Exception e) {
  5. ("Error while invoking RpcHandler#receive() for one-way message.", e);
  6. } finally {
  7. ().release();
  8. }
  9. }

processOneWayMessage方法的实现processRpcRequest非常相似,区别在于processOneWayMessage调用了代码清单1中ONE_WAY_CALLBACK的receive方法,因而processOneWayMessage在处理完RPC请求后不会对客户端作出响应。

         从以上四种处理的分析可以看出最终的处理都由RpcHandler及其内部组件完成。除了OneWayMessage的消息外,其余三种消息都是最终调用respond方法响应客户端,其实现见代码清单7。

代码清单7         respond的实现

  1. private void respond(final Encodable result) {
  2. final SocketAddress remoteAddress = ();
  3. (result).addListener(
  4. new ChannelFutureListener() {
  5. @Override
  6. public void operationComplete(ChannelFuture future) throws Exception {
  7. if (()) {
  8. ("Sent result {} to client {}", result, remoteAddress);
  9. } else {
  10. (("Error sending result %s to %s; closing connection",
  11. result, remoteAddress), ());
  12. ();
  13. }
  14. }
  15. }
  16. );
  17. }

可以看到respond方法中实际调用了Channel的writeAndFlush方法[1]来响应客户端。

 


[1] Channel的writeAndFlush方法涉及Netty的实现细节及原理,这并不是本书所要阐述的内容,有兴趣的读者可以访问Netty官网:获取更多信息。

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:/