提示:阅读本文前最好先阅读:
- 《Spark2.1.0之内置RPC框架》
- 《spark2.1.0之源码分析——RPC配置TransportConf》
- 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
- 《spark2.1.0之源码分析——RPC服务器TransportServer》
- 《spark2.1.0之源码分析——RPC管道初始化》
- 《spark2.1.0之源码分析——RPC传输管道处理器详解》
在《spark2.1.0之源码分析——RPC传输管道处理器详解》一文中详细介绍了TransportRequestHandler。
由于TransportRequestHandler实际是把请求消息交给RpcHandler进一步处理的,所以这里对RpcHandler首先做个介绍。RpcHandler是一个抽象类,定义了一些RPC处理器的规范,其主要实现见代码清单1。
代码清单1 RpcHandler的实现
-
public abstract class RpcHandler {
-
-
private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
-
-
public abstract void receive(
-
TransportClient client,
-
ByteBuffer message,
-
RpcResponseCallback callback);
-
-
public abstract StreamManager getStreamManager();
-
-
public void receive(TransportClient client, ByteBuffer message) {
-
receive(client, message, ONE_WAY_CALLBACK);
-
}
-
-
public void channelActive(TransportClient client) { }
-
-
public void channelInactive(TransportClient client) { }
-
-
public void exceptionCaught(Throwable cause, TransportClient client) { }
-
-
private static class OneWayRpcCallback implements RpcResponseCallback {
-
-
private static final Logger logger = ();
-
-
@Override
-
public void onSuccess(ByteBuffer response) {
-
("Response provided for one-way RPC.");
-
}
-
-
@Override
-
public void onFailure(Throwable e) {
-
("Error response provided for one-way RPC.", e);
-
}
-
-
}
-
-
}
代码清单1中RpcHandler的各个方法的作用如下:
- receive:这是一个抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现。receive接收三个参数,分别是TransportClient、ByteBuffer和RpcResponseCallback。RpcResponseCallback用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback都会被调用一次。RpcResponseCallback的接口定义如下:
-
public interface RpcResponseCallback {
-
void onSuccess(ByteBuffer response);
-
void onFailure(Throwable e);
-
}
- 重载的receive:只接收TransportClient和ByteBuffer两个参数,RpcResponseCallback为默认的ONE_WAY_CALLBACK,其类型为OneWayRpcCallback,从代码清单3-27中OneWayRpcCallback的实现可以看出其onSuccess和onFailure只是打印日志,并没有针对客户端做回复处理。
- channelActive:当与给定客户端相关联的channel处于活动状态时调用。
- channelInactive:当与给定客户端相关联的channel处于非活动状态时调用。
- exceptionCaught:当channel产生异常时调用。
- getStreamManager:获取StreamManager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态。、
介绍完RpcHandler,现在回到TransportRequestHandler的处理过程。TransportRequestHandler处理以上四种RequestMessage的实现见代码清单2。
代码清单2 TransportRequestHandler的handle方法
-
@Override
-
public void handle(RequestMessage request) {
-
if (request instanceof ChunkFetchRequest) {
-
processFetchRequest((ChunkFetchRequest) request);
-
} else if (request instanceof RpcRequest) {
-
processRpcRequest((RpcRequest) request);
-
} else if (request instanceof OneWayMessage) {
-
processOneWayMessage((OneWayMessage) request);
-
} else if (request instanceof StreamRequest) {
-
processStreamRequest((StreamRequest) request);
-
} else {
-
throw new IllegalArgumentException("Unknown request type: " + request);
-
}
-
}
结合代码清单2,下面逐一详细分析这四种类型请求的处理过程。
处理块获取请求
processFetchRequest方法用于处理ChunkFetchRequest类型的消息,其实现见代码清单3。
代码清单3 processFetchRequest的实现
-
private void processFetchRequest(final ChunkFetchRequest req) {
-
if (()) {
-
("Received req from {} to fetch block {}", getRemoteAddress(channel),
-
);
-
}
-
-
ManagedBuffer buf;
-
try {
-
(reverseClient, );
-
(channel, );
-
buf = (, );
-
} catch (Exception e) {
-
(("Error opening block %s for request from %s",
-
, getRemoteAddress(channel)), e);
-
respond(new ChunkFetchFailure(, (e)));
-
return;
-
}
-
-
respond(new ChunkFetchSuccess(, buf));
-
}
代码清单3中的streamManager是通过调用RpcHandler的getStreamManager方法获取的StreamManager。processFetchRequest的处理都依托于RpcHandler的StreamManager,其处理步骤如下:
- 调用StreamManager的checkAuthorization方法,校验客户端是否有权限从给定的流中读取;
- 调用StreamManager的registerChannel方法,将一个流和一条(只能是一条)客户端的TCP连接关联起来,这可以保证对于单个的流只会有一个客户端读取。流关闭之后就永远不能够重用了;
- 调用StreamManager的getChunk方法,获取单个的块(块被封装为ManagedBuffer)。由于单个的流只能与单个的TCP连接相关联,因此getChunk方法不能为了某个特殊的流而并行调用;
- 将ManagedBuffer和流的块Id封装为ChunkFetchSuccess后,调用respond方法返回给客户端。
有关StreamManager的具体实现,读者可以参考《Spark内核设计的艺术 架构设计与实现》一书5.3.5节介绍的NettyStreamManager和6.9.2节介绍的NettyBlockRpcServer中的OneForOneStreamManager。
处理RPC请求
processRpcRequest方法用于处理RpcRequest类型的消息,其实现见代码清单4。
代码清单4 processRpcRequest的实现
-
private void processRpcRequest(final RpcRequest req) {
-
try {
-
(reverseClient, ().nioByteBuffer(), new RpcResponseCallback() {
-
@Override
-
public void onSuccess(ByteBuffer response) {
-
respond(new RpcResponse(, new NioManagedBuffer(response)));
-
}
-
-
@Override
-
public void onFailure(Throwable e) {
-
respond(new RpcFailure(, (e)));
-
}
-
});
-
} catch (Exception e) {
-
("Error while invoking RpcHandler#receive() on RPC id " + , e);
-
respond(new RpcFailure(, (e)));
-
} finally {
-
().release();
-
}
-
}
代码清单4中将RpcRequest消息的内容体、发送消息的客户端以及一个RpcResponseCallback类型的匿名内部类作为参数传递给了RpcHandler的receive方法。这就是说真正用于处理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。由于RpcHandler是抽象类(见代码清单1),其receive方法也是抽象方法,所以具体的操作将由RpcHandler的实现了receive方法的子类来完成。所有继承RpcHandler的子类都需要在其receive方法的具体实现中回调RpcResponseCallback的onSuccess(处理成功时)或者onFailure(处理失败时)方法。从RpcResponseCallback的实现来看,无论处理结果成功还是失败,都将调用respond方法对客户端进行响应。
处理流请求
processStreamRequest方法用于处理StreamRequest类型的消息,其实现见代码清单5。
代码清单5 processStreamRequest的实现
-
private void processStreamRequest(final StreamRequest req) {
-
ManagedBuffer buf;
-
try {
-
buf = ();// 将获取到的流数据封装为ManagedBuffer
-
} catch (Exception e) {
-
((
-
"Error opening stream %s for request from %s", , getRemoteAddress(channel)), e);
-
respond(new StreamFailure(, (e)));
-
return;
-
}
-
-
if (buf != null) {
-
respond(new StreamResponse(, (), buf));
-
} else {
-
respond(new StreamFailure(, (
-
"Stream '%s' was not found.", )));
-
}
-
}
代码清单5中也使用了RpcHandler的StreamManager,其处理步骤如下:
- 调用StreamManager的openStream方法将获取到的流数据封装为ManagedBuffer;
- 当成功或失败时调用respond方法向客户端响应。
处理无需回复的RPC请求
processOneWayMessage方法用于处理StreamRequest类型的消息,其实现见代码清单6。
代码清单6 processOneWayMessage的实现
-
private void processOneWayMessage(OneWayMessage req) {
-
try {
-
(reverseClient, ().nioByteBuffer());
-
} catch (Exception e) {
-
("Error while invoking RpcHandler#receive() for one-way message.", e);
-
} finally {
-
().release();
-
}
-
}
processOneWayMessage方法的实现processRpcRequest非常相似,区别在于processOneWayMessage调用了代码清单1中ONE_WAY_CALLBACK的receive方法,因而processOneWayMessage在处理完RPC请求后不会对客户端作出响应。
从以上四种处理的分析可以看出最终的处理都由RpcHandler及其内部组件完成。除了OneWayMessage的消息外,其余三种消息都是最终调用respond方法响应客户端,其实现见代码清单7。
代码清单7 respond的实现
-
private void respond(final Encodable result) {
-
final SocketAddress remoteAddress = ();
-
(result).addListener(
-
new ChannelFutureListener() {
-
@Override
-
public void operationComplete(ChannelFuture future) throws Exception {
-
if (()) {
-
("Sent result {} to client {}", result, remoteAddress);
-
} else {
-
(("Error sending result %s to %s; closing connection",
-
result, remoteAddress), ());
-
();
-
}
-
}
-
}
-
);
-
}
可以看到respond方法中实际调用了Channel的writeAndFlush方法[1]来响应客户端。
[1] Channel的writeAndFlush方法涉及Netty的实现细节及原理,这并不是本书所要阐述的内容,有兴趣的读者可以访问Netty官网:获取更多信息。
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖链接如下:
京东:/