Hadoop RPC机制&完整调用流程

时间:2021-06-24 22:07:17
RPC框架利用的Java的反射能力,该RPC框架要求调用的参数和返回结果必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。同时,接口方法应该只抛出IOException异常。RPC是一个命令式的接口,客户端通过RPC框架将要执行的方法封装成参数发送给服务器,服务器这个方法后将执行结果返回给客户端。


选择了java,为什么不用现成的rmi ?
答案就是:使用rpc可以对连接进行精细控制、超时、缓冲区处理等进行精确处理,rmi在这方面难以提供。


Dynamic Proxy其实就是一个典型的Proxy模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个handler,在Hadoop的RPC中,就是Invoker对象。 
动态代理例子: http://weixiaolu.iteye.com/blog/1477774


Client和Server之间的控制信息是通过RPC机制完成的。DataNode端是通过获得NameNode的代理,通过该代理和NameNode进行通信的。Client通过RPC机制、动态代理机制调用Server端的方法,Server端将方法调用结果返回给Client。
Hadoop RPC机制&完整调用流程

RPC.Invocation是一个方法调用类。包含以下变量:
    private String methodName;//方法名
private Class[] parameterClasses;//参数类型
private Object[] parameters;//参数值
private Configuration conf;
继承了Writeable,所以可以用于序列化传输。readFields和write方法分别借助ObjectWriteable类对上述三种变量的序列化和反序列化。


RPC.ClientCache是客户端的缓存,用于维护客户端集合,定义如下:
private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();
在getClient中,如果SocketFactory对应的client不存在,就new一个,并将其加入clients中。




VersionMismatch是一个异常类,当Client的协议版本与Server的协议版本不匹配时会抛出此异常。




InvocationHandler 是代理实例的调用处理程序实现的接口。 
每个代理实例都具有一个关联的调用处理程序。对代理实例调用方法时,将对方法调用进行编码并将其指派到它的调用处理程序的 invoke 方法。 



Invoker是InvocationHandler接口的实例,Invoker.invoke是处理方法的流程是:通过调用client.call方法,将Invocation方法调用类作为参数发给Server。
在Client.Connection.SetParams中发送给Server的参数其实是Invocation方法调用类,Server接收到这个方法调用类后,在Server.Handler处理线程中调用Server.call方法,而Server.call是抽象方法,实际上调用的方法实例是RPC.Server.call方法,该方法中Object value = method.invoke(instance, call.getParameters());就是根据Invocation方法调用类执行相应的Server的方法。
public Writable call(Class<?> protocol, Writable param, long receivedTime) 
throws IOException {
try {
Invocation call = (Invocation)param;//对应着通过RPC.invoke发送的Invocation方法调用实例,RPC.invoke通过Client.call发送
//Invocation给Server,Server.Handler通过调用本方法进行处理Invocation
if (verbose) log("Call: " + call);


Method method =
protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);//使用单一安全性检查(为了提高效率)为一组对象设置 accessible 标志的便捷方法。


long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());//对带有指定参数的指定对象instance调用由此 Method 对象表示的底层方法。
int processingTime = (int) (System.currentTimeMillis() - startTime);
...
}


============================================================================
Server要实现VersionProtocol接口,此接口用于客户端协议版本和Server端协议版本的校验。
Client可通过waitForProxy获得一个远程代理与Server通信,这样Client就可以调用Server上的方法。waitForProxy实际上就是getProxy(...)+失败重试。
getProxy是一个动态代理,其核心代码如下:
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
通过Proxy.newProxyInstance获取一个代理实例,会将proxy调用的方法method传到Invoker.invoke中,Invoker.invoke将method和实参args封装成Invocation方法调用类,并调用Client.call将此Invocation类发给Server,Server接收到此Call时,在Handler处理线程中将调用RPC.call对此call进行处理,RPC.call会调用
Object value = method.invoke(instance, call.getParameters());
执行客户端所传过来的程序,并将执行结果返回给Server.Handler,Server.Handler将此返回值包装成一个Call后加入到responseQueue,Server.Responsdr线程从responseQueue中取出call,在call的通道可写的情况下调用doAsynWrite将响应值写到通道传给Client。
此时Connection.run开始receiveResponse,读出Server的响应值,并调用call.setValue向Client.call发出notify,此时Client.call中的call.wait()激活,将处理结果返回到Invoker.invoke,再返回给最初的客户端调用者。 
调用例子: http://www.cnblogs.com/hiddenfox/archive/2011/12/30/2305786.html


   
Ps.HDFS中定义了一些接口协议用于一端调用另一端的功能,如DN调用DN,DN调用NN,client调用DN等,被调用端实现了这些协议中的相应方法。这样调用端只要通过RPC框架的方式就可以调用被调用者协议中的特定方法了。