选择了java,为什么不用现成的rmi ?
答案就是:使用rpc可以对连接进行精细控制、超时、缓冲区处理等进行精确处理,rmi在这方面难以提供。
Dynamic Proxy其实就是一个典型的Proxy模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个handler,在Hadoop的RPC中,就是Invoker对象。
动态代理例子: http://weixiaolu.iteye.com/blog/1477774
Client和Server之间的控制信息是通过RPC机制完成的。DataNode端是通过获得NameNode的代理,通过该代理和NameNode进行通信的。Client通过RPC机制、动态代理机制调用Server端的方法,Server端将方法调用结果返回给Client。
RPC.Invocation是一个方法调用类。包含以下变量:
private String methodName;//方法名继承了Writeable,所以可以用于序列化传输。readFields和write方法分别借助ObjectWriteable类对上述三种变量的序列化和反序列化。
private Class[] parameterClasses;//参数类型
private Object[] parameters;//参数值
private Configuration conf;
RPC.ClientCache是客户端的缓存,用于维护客户端集合,定义如下:
private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();在getClient中,如果SocketFactory对应的client不存在,就new一个,并将其加入clients中。
VersionMismatch是一个异常类,当Client的协议版本与Server的协议版本不匹配时会抛出此异常。
InvocationHandler 是代理实例的调用处理程序实现的接口。
每个代理实例都具有一个关联的调用处理程序。对代理实例调用方法时,将对方法调用进行编码并将其指派到它的调用处理程序的 invoke 方法。
Invoker是InvocationHandler接口的实例,Invoker.invoke是处理方法的流程是:通过调用client.call方法,将Invocation方法调用类作为参数发给Server。
在Client.Connection.SetParams中发送给Server的参数其实是Invocation方法调用类,Server接收到这个方法调用类后,在Server.Handler处理线程中调用Server.call方法,而Server.call是抽象方法,实际上调用的方法实例是RPC.Server.call方法,该方法中Object value = method.invoke(instance, call.getParameters());就是根据Invocation方法调用类执行相应的Server的方法。
public Writable call(Class<?> protocol, Writable param, long receivedTime)
throws IOException {
try {
Invocation call = (Invocation)param;//对应着通过RPC.invoke发送的Invocation方法调用实例,RPC.invoke通过Client.call发送
//Invocation给Server,Server.Handler通过调用本方法进行处理Invocation
if (verbose) log("Call: " + call);
Method method =
protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);//使用单一安全性检查(为了提高效率)为一组对象设置 accessible 标志的便捷方法。
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());//对带有指定参数的指定对象instance调用由此 Method 对象表示的底层方法。
int processingTime = (int) (System.currentTimeMillis() - startTime);
...
}
============================================================================
Server要实现VersionProtocol接口,此接口用于客户端协议版本和Server端协议版本的校验。
Client可通过waitForProxy获得一个远程代理与Server通信,这样Client就可以调用Server上的方法。waitForProxy实际上就是getProxy(...)+失败重试。
getProxy是一个动态代理,其核心代码如下:
VersionedProtocol proxy =通过Proxy.newProxyInstance获取一个代理实例,会将proxy调用的方法method传到Invoker.invoke中,Invoker.invoke将method和实参args封装成Invocation方法调用类,并调用Client.call将此Invocation类发给Server,Server接收到此Call时,在Handler处理线程中将调用RPC.call对此call进行处理,RPC.call会调用
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
Object value = method.invoke(instance, call.getParameters());执行客户端所传过来的程序,并将执行结果返回给Server.Handler,Server.Handler将此返回值包装成一个Call后加入到responseQueue,Server.Responsdr线程从responseQueue中取出call,在call的通道可写的情况下调用doAsynWrite将响应值写到通道传给Client。
此时Connection.run开始receiveResponse,读出Server的响应值,并调用call.setValue向Client.call发出notify,此时Client.call中的call.wait()激活,将处理结果返回到Invoker.invoke,再返回给最初的客户端调用者。
调用例子: http://www.cnblogs.com/hiddenfox/archive/2011/12/30/2305786.html
Ps.HDFS中定义了一些接口协议用于一端调用另一端的功能,如DN调用DN,DN调用NN,client调用DN等,被调用端实现了这些协议中的相应方法。这样调用端只要通过RPC框架的方式就可以调用被调用者协议中的特定方法了。