Hadoop之RPC
通信双方遵循的契约
2.Hadoop中RPC通信原理
它是通过调用RPC类中的静态方法waitForProxy()方法而得到了InterTrackerProtocol的一个代理,借助于这个代理对象,TaskTracker就可以与JobTracker进行通信了。
跟踪Hadoop的源代码,我们可以发现PRC.waitForProxy()最终是调用的Proxy.newProxyInstance()来创建一个代理对象,第一个参数是类加载器(代理类在运行的过程中动态生成),第二个参数是要实现的代理类的接口,第三个参数是InvokercationHandler接口的子类,最终调用的也就是InvokercationHandler实现类的的invoker()方法。
我们可以看到,InvocationHandler的实现类Invoker中主要包含两个成员变量即remoteId(唯一标识RPC的服务器端)、Client(通过工厂模式得到的客户端),invoke()方法中最重要的就是下面的语句:
ObjectWritable value = (ObjectWritable)client.call( new Invocation(method, args), remoteId);
|
其中call方法的第一个参数封装调用方法和参数并实现Writable接口的对象,以便于在分布式环境中传输,第二个参数勿需多言,它就用于唯一标识RPC Server,也就是与指定的Server进行通信。call方法的核心代码如下:
其中竟然出现了一个Call对象,我们看到此方法返回的结果是call对象的一个成员变量,也就是说Call封装了Client的请求以及Server的响应,synchronized的使用会同步Client的请求以及Server的响应。通Connection对象的sendParam方法可以将请求发送给Server,那么Connection又是什么呢?
其实Connection是扩展Thread而得到的一个线程,最终把所有的connection对象都放入到一个Hashtable中,同一个ConnectionId的Connection可以复用,降低了创建线程的开销。connection.setupIOstreams()用于在真正的建立连接,并将RPC的header写入到输出流中,通过start方法启动线程,其核心代码如下所示:
public void run() {
while (waitForWork()) { //等到可以读响应是返回true
receiveResponse();
} <span style= "font-family: verdana, Arial, Helvetica, sans-serif; font-size: 14px; line-height: 1.5;" > </span>
|
receiveResponse方法主要是从输入流反序列化出value,并将其封装在call对象中,这样client端就得到了server的响应,核心代码如下:
才疏学浅,错误之处在所难免,恳请各位予以指正。。
High Level Concurrency Objects
So far, this lesson has focused on the low-level APIs that have been part of the Java platform from the very beginning. These APIs are adequate for very basic tasks, but higher-level building blocks are needed for more advanced tasks. This is especially true for massively concurrent applications that fully exploit today's multiprocessor and multi-core systems.
In this section we'll look at some of the high-level concurrency features introduced with version 5.0 of the Java platform. Most of these features are implemented in the new java.util.concurrent
packages. There are also new concurrent data structures in the Java Collections Framework.
- Lock objects support locking idioms that simplify many concurrent applications.
-
Executors define a high-level API for launching and managing threads. Executor implementations provided by
java.util.concurrent
provide thread pool management suitable for large-scale applications. - Concurrent collections make it easier to manage large collections of data, and can greatly reduce the need for synchronization.
- Atomic variables have features that minimize synchronization and help avoid memory consistency errors.
-
ThreadLocalRandom
(in JDK 7) provides efficient generation of pseudorandom numbers from multiple threads.
译文:
高性能并发对象
到目前为止,这个课集中精力介绍了初级的java平台的API.这些API对于一些基本的任务,但是高性能的程序块对于一些更高级的任务来说是需要的。这个尤其是对于当今多处理器和多核系统需要处理高并发的应用程序。
在这一节我们将会看到许多java 5.0中引进的高性能的并发特性。许多这些特性都在 java.util.concurrent包中。这里也还有些并发的数据结构在java集合框架中。
- 锁对象,适用于许多并发程序的锁对象条目
- 执行性器,定义一个能载入和管理线程的高性能的API。执行器由java.util.concurrent提供,
- 提供对于大规模应用程序的线程池管理。
- 并发集合,使管理并发的数据集更加容易,并且能够有效的减少synchronization的需求。
- 原子性变量,具有最小的同步特性并能有效的避免内存一致性错误。
- 本地随机线程(JDK 7)对于多线程提供有效的生成伪随机数的方法。