Hadoop版本Hadoop2.6
RPC主要分为3个部分:(1)交互协议 (2)客户端(3)服务端
(3)服务端
RPC服务端的实例代码:
public class Starter {
public static void main(String[] args) throws IOException {
RPC.Builder build = new RPC.Builder(new Configuration());
build.setBindAddress("localhost").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl());
RPC.Server server = build.build();
server.start();
}
}
RPC 服务端主要通过NIO来处理客户端发来的请求。
RPC服务端涉及的类主要有
org.apache.hadoop.ipc.Server(抽象类,server的最顶层类,与客户端的链接,响应,数据传输)
org.apache.hadoop.ipc.RPC.Server(RPC的内部类,是一个抽象类,主要涉及将请求交给动态代理)
org.apache.hadoop.ipc.WritableRpcEngine.Server(WritableRpcEngine的内部类,是一个静态类,动态代理的具体实现,得到请求结果)
先看ipc.Server类,它有几个主要的内部类分别是:
Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :连接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。
(1)启动服务
如上述服务端实例代码所示,当调用函数start(),RPC服务端就启动起来了,我们先看看start()里面有什么
/** Starts the service. Must be called before any calls will be handled. */
//该实现在ipc.Server类中
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
可以看出,Server端通过启动Listener监听客户端发来的请求,启动responder响应客户端发来的请求,启动多个线程Handler循环阻塞读取请求交给responder响应。
先看看如何监听客户端发来的请求
(2)监听客户端的连接
Listner初始化,构造函数如下所示,通过Java NIO创建一个ServerSocketChannel监听本地地址和端口,并设置成非阻塞模式,并注册成接听客户端连接事件,并启动多个Reader线程,同时将读客户端请求数据的事件交给Reader实现。
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
} // Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
Listener:Listener是一个Thread类的子类,通过start()启动该线程,并运行该线程的run()方法:
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();//获取连接事件
while (iter.hasNext()) {//遍历连接事件
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())//如果该事件是连接事件
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
LOG.info("Stopping " + Thread.currentThread().getName()); synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) { } selector= null;
acceptChannel= null; // close all connections
connectionManager.stopIdleScan();
connectionManager.closeAll();
}
}
//对连接事件进行处理
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) { channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true); Reader reader = getReader();
Connection c = connectionManager.register(channel);//把与客户端连接的管道封装成Connection
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);//将客户端请求连接的通道加入到Reader,监听用户发来的请求。
}
}
Reader:Reader也是Thread类的子类,通过start()启动该线程,并运行该线程的run()方法:
public void run() {
LOG.info("Starting " + Thread.currentThread().getName());
try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
}
}
} private synchronized void doRunLoop() {
while (running) {
SelectionKey key = null;
try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();//在Listner类中通过reader.addConnection(c)加入到该阻塞队列中
for (int i=size; i>0; i--) {//遍历处理用户的连接事件,并监听用户发来的请求
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
readSelector.select(); Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {//判断该事件是不是读事件,并处理该读事件
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
}
}
}
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();//因为Connection也保存着与客户端的连接,因此这里提取了Connection,把处理细节交给Connection
if (c == null) {
return;
}
c.setLastContact(Time.now()); try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
// a WrappedRpcServerException is an exception that has been sent
// to the client, so the stacktrace is unnecessary; any other
// exceptions are unexpected internal server errors and thus the
// stacktrace should be logged
LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " +
c.getHostAddress() + " threw exception [" + e + "]",
(e instanceof WrappedRpcServerException) ? null : e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
closeConnection(c);
c = null;
}
else {
c.setLastContact(Time.now());
}
}
Connection类中通过channelRead(channel, data)读取客户端发送的数据,并将读取的数据通过processOneRpc(data.array())方法处理逻辑过程,
processOneRpc通过调用processRpcRequest(RpcRequestHeaderProto header,DataInputStream dis)方法,
在processRpcRequest方法中封装成Call数据对象,并加入callQueue.put(call)队列中。
(3)实现客户端的请求的服务,得到客户端请求的结果数据
Handler类实现客户端请求的服务,通过从callQueue队列获取客户端RPC请求Call对象,并调用抽象方法call处理请求。
抽象方法call的实现在WritableRpcEngine类的内部类Server类,Server类是继承RPC.Server,是RPC服务调用的具体实现并定义相关协议
下面是call方法的具体实现:
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
String protocolName, Writable rpcRequest, long receivedTime)
throws IOException, RPC.VersionMismatch { Invocation call = (Invocation)rpcRequest;
if (server.verbose) log("Call: " + call); // Verify writable rpc version
if (call.getRpcVersion() != writableRpcVersion) {
// Client is using a different version of WritableRpc
throw new RpcServerException(
"WritableRpc version mismatch, client side version="
+ call.getRpcVersion() + ", server side version="
+ writableRpcVersion);
} long clientVersion = call.getProtocolVersion();
final String protoName;
ProtoClassProtoImpl protocolImpl;
if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
// VersionProtocol methods are often used by client to figure out
// which version of protocol to use.
//
// Versioned protocol methods should go the protocolName protocol
// rather than the declaring class of the method since the
// the declaring class is VersionedProtocol which is not
// registered directly.
// Send the call to the highest protocol version
VerProtocolImpl highest = server.getHighestSupportedProtocol(
RPC.RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) {
throw new RpcServerException("Unknown protocol: " + protocolName);
}
protocolImpl = highest.protocolTarget;
} else {
protoName = call.declaringClassProtocolName; // Find the right impl for the protocol based on client version.
ProtoNameVer pv =
new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
protocolImpl =
server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
protoName);
if (highest == null) {
throw new RpcServerException("Unknown protocol: " + protoName);
} else { // protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
}
} // Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try {
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
if (server.verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
exception = (IOException)target;
throw (IOException)target;
} else {
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
exception = ioe;
throw ioe;
}
} catch (Throwable e) {
if (!(e instanceof IOException)) {
LOG.error("Unexpected throwable object ", e);
}
IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
exception = ioe;
throw ioe;
} finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
call.getMethodName() :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
}
}
通过调用RPC服务得到客户端请求的结果value,使用Responder类将结果返回给客户端。下面是Handler类run()方法中部分代码
CurCall.set(null);//处理完call请求,故将当前处理call表示标志为Null
synchronized (call.connection.responseQueue) {
// setupResponse() needs to be sync'ed together with
// responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);//将返回值封装到输出流中 // Discard the large buf and reset it back to smaller size
// to free up heap
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "
+ call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
responder.doRespond(call);//将返回请求call加入队列中,然后在Responder类中一一处理。
}
(4)返回客户端的请求结果
Responder类:负责返回客户端请求的结果,通过NIO注册OP_WRITE写事件,将结果返回给客户端。
每处理完一个请求,就会调用Responder中doRespond方法处理请求结果。
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}
可以看到上述方法又调用processResponse方法处理请求结果,这里将请求结果对象call注册OP_WRITE写事件,通过NIO返回给客户端;
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
而Responder的run方法和doRunLoop方法检测OP_WRITE写事件,并通过doAsyncWrite方法处理该事件
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
try {
doRunLoop();
} finally {
LOG.info("Stopping " + Thread.currentThread().getName());
try {
writeSelector.close();
} catch (IOException ioe) {
LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);
}
}
} private void doRunLoop() {
long lastPurgeTime = 0; // last check for old calls. while (running) {
try {
waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e) {
LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
}
}
long now = Time.now();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not been sent out for a
// long time, discard them.
//
if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses.");
}
ArrayList<Call> calls; // get the list of channels from list of keys.
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
} for(Call call : calls) {
doPurge(call, now);
}
} catch (OutOfMemoryError e) {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
LOG.warn("Out of Memory in server select", e);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
LOG.warn("Exception in Responder", e);
}
}
}
而doAsyncWrite方法还是通过调用processResponse方法处理结果,processResponse方法调用channelWrite将结果返回给客户端,当不能一次性返回时,在processResponse方法里将返回结果再次注册OP_WRITE写事件,因而形成一个循环使得数据能全部返回给客户端。