上节说了关于通用请求代理,实际上对spring的bean引用都是通过koalasClientProxy来实现的,那么在代理方法中才是我们实际的发送逻辑,咱们先看一下原生的thrift请求是什么样的。
public void startClient(String userName) {
TTransport transport = null;
try {
//transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
transport = new TFramedTransport(new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
//TProtocol protocol = new TCompactProtocol(transport);
// TProtocol protocol = new TJSONProtocol(transport);
Client client = new Client(protocol);
transport.open();
String result = client.sayHello(userName);
System.out.println("Thrify client result =: " + result);
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException e) {
e.printStackTrace();
} finally {
if (null != transport) {
transport.close();
}
}
}
用同步调用的例子来说明,首先需要new 一个TSocket对象,这个对象其实就是thrift维护的socket对象,内部封装维护了Socket模型,相信大家对原生Socket已经很了解了,这次不在过多阐述,熟悉Socket的朋友一定知道每次需要调用socket.open的时候,需要和远程server进行三个握手来建立通信,三次通信的代价是巨大的,对于跨异地调用或者是跨机房调用都是巨大的开销,我们不可能每次通信都和远程server进行三次握手,那么我们可以将已经握手的TCP连接放入连接池中,每次从连接池中获取到socket然后在进行调用,使用完毕之后再放入连接池,原理和mysql连接池DBCP和阿里的德鲁伊等等一个道理关于连接池相关的内容大家可以参照我git代码中的AbstractBaseIcluster相关实现即可。
我们将socket对象缓存到缓存池中,每次请求都是TCP复用,这样将极大的提升请求速度,这也是作为企业级RPC不可或缺的一部分。
看一下核心实现
@Override
public Object invoke(MethodInvocation invocation) throws InvocationTargetException, IllegalAccessException { Method method = invocation.getMethod ();
String methodName = method.getName ();
Object[] args = invocation.getArguments (); Class<?>[] parameterTypes = method.getParameterTypes ();
if (method.getDeclaringClass () == Object.class) {
try {
return method.invoke ( this, args );
} catch (IllegalAccessException e) {
LOG.error ( e.getMessage (), e );
return null;
}
}
if ("toString".equals ( methodName ) && parameterTypes.length == 0) {
return this.toString ();
}
if ("hashCode".equals ( methodName ) && parameterTypes.length == 0) {
return this.hashCode ();
}
if ("equals".equals ( methodName ) && parameterTypes.length == 1) {
return this.equals ( args[0] );
} boolean serviceTop =false; Transaction transaction=null;
if(TraceThreadContext.get () ==null){
serviceTop=true;
transaction = Cat.newTransaction("Service", method.getDeclaringClass ().getName ().concat ( "." ).concat ( methodName ).concat ( ".top" )); MessageTree tree = Cat.getManager().getThreadLocalMessageTree();
String messageId = tree.getMessageId(); if (messageId == null) {
messageId = Cat.createMessageId();
tree.setMessageId(messageId);
} String childId = Cat.getProducer().createRpcServerId("default"); String root = tree.getRootMessageId(); if (root == null) {
root = messageId;
}
Cat.logEvent(CatConstants.TYPE_REMOTE_CALL, "", Event.SUCCESS, childId); KoalasTrace koalasTrace = new KoalasTrace ( );
koalasTrace.setChildId (childId );
koalasTrace.setParentId ( messageId);
koalasTrace.setRootId ( root );
TraceThreadContext.set (koalasTrace);
} else{
KoalasTrace currentKoalasTrace = TraceThreadContext.get ();
String child_Id = Cat.getProducer().createRpcServerId("default");
Cat.logEvent(CatConstants.TYPE_REMOTE_CALL, "", Event.SUCCESS, child_Id);
currentKoalasTrace.setChildId ( child_Id );
}
try {
TTransport socket = null;
int currTryTimes = 0;
while (currTryTimes++ < retryTimes) {
ServerObject serverObject = icluster.getObjectForRemote ();
if (serverObject == null) throw new IllegalStateException("no server list to use :" + koalasClientProxy.getServiceInterface ().getName ());
GenericObjectPool<TTransport> genericObjectPool = serverObject.getGenericObjectPool ();
try {
long before = System.currentTimeMillis ();
socket = genericObjectPool.borrowObject ();
long after = System.currentTimeMillis ();
LOG.debug ( "get Object from pool with {} ms", after - before );
} catch (Exception e) {
if (socket != null)
genericObjectPool.returnObject ( socket );
LOG.error ( e.getMessage (), e );
if(transaction!=null)
transaction.setStatus ( e );
throw new IllegalStateException("borrowObject error :" + koalasClientProxy.getServiceInterface ().getName ());
} Object obj = koalasClientProxy.getInterfaceClientInstance ( socket, serverObject.getRemoteServer ().getServer () ); if (obj instanceof TAsyncClient) {
((TAsyncClient) obj).setTimeout ( asyncTimeOut );
if (args.length < 1) {
genericObjectPool.returnObject ( socket );
throw new IllegalStateException ( "args number error" );
} Object argslast = args[args.length - 1];
if (!(argslast instanceof AsyncMethodCallback)) {
genericObjectPool.returnObject ( socket );
throw new IllegalStateException ( "args type error" );
} AsyncMethodCallback callback = (AsyncMethodCallback) argslast;
ReleaseResourcesKoalasAsyncCallBack releaseResourcesKoalasAsyncCallBack = new ReleaseResourcesKoalasAsyncCallBack ( callback, serverObject, socket );
args[args.length - 1] = releaseResourcesKoalasAsyncCallBack; }
try {
Object o = method.invoke ( obj, args );
if (socket instanceof TSocket) {
genericObjectPool.returnObject ( socket ); }
if(transaction!=null)
transaction.setStatus ( Transaction.SUCCESS );
return o;
} catch (Exception e) {
Throwable cause = (e.getCause () == null) ? e : e.getCause (); boolean ifreturn = false;
if (cause instanceof TApplicationException) {
if (((TApplicationException) cause).getType () == 6666) {
LOG.info ( "the server{}--serverName【{}】 thread pool is busy ,retry it!", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
if (socket != null) {
genericObjectPool.returnObject ( socket );
ifreturn = true;
}
Thread.yield ();
if (retryRequest)
continue;
} if (((TApplicationException) cause).getType () == 9999) {
LOG.error ( "rsa error with service--{}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
if (socket != null) {
genericObjectPool.returnObject ( socket );
}
if(transaction!=null)
transaction.setStatus ( cause );
throw new IllegalStateException("rsa error with service" + serverObject.getRemoteServer ().toString ()+koalasClientProxy.getServiceInterface ().getName () );
} if (((TApplicationException) cause).getType () == 6699) {
LOG.error ( "this client is not rsa support,please get the privateKey and publickey with server--{}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
if (socket != null) {
genericObjectPool.returnObject ( socket );
}
if(transaction!=null)
transaction.setStatus ( cause );
throw new IllegalStateException("this client is not rsa support,please get the privateKey and publickey with server" + serverObject.getRemoteServer ().toString ()+koalasClientProxy.getServiceInterface ().getName ());
} if (((TApplicationException) cause).getType () == TApplicationException.INTERNAL_ERROR) {
LOG.error ( "this server is error please take the error log with server--{}--serverName【{}】the remote server error message data【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName (),((TApplicationException) cause).getMessage () );
if (socket != null) {
genericObjectPool.returnObject ( socket );
}
if(transaction!=null)
transaction.setStatus ( cause );
throw new IllegalStateException("this server is error please take the error log with server" + serverObject.getRemoteServer ()+koalasClientProxy.getServiceInterface ().getName ());
} if (((TApplicationException) cause).getType () == TApplicationException.MISSING_RESULT) {
if (socket != null) {
genericObjectPool.returnObject ( socket );
}
return null;
}
} if (cause instanceof RSAException) {
LOG.error ( "this client privateKey or publicKey is error,please check it! --{}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
if (socket != null) {
genericObjectPool.returnObject ( socket );
}
if(transaction!=null)
transaction.setStatus ( cause );
throw new IllegalStateException("this client privateKey or publicKey is error,please check it!" + serverObject.getRemoteServer ()+ koalasClientProxy.getServiceInterface ().getName ());
} if(cause instanceof OutMaxLengthException){
LOG.error ( (cause ).getMessage (),cause );
if (socket != null) {
genericObjectPool.returnObject ( socket );
}
if(transaction!=null)
transaction.setStatus ( cause );
throw new IllegalStateException("to big content!" + serverObject.getRemoteServer ()+ koalasClientProxy.getServiceInterface ().getName ());
} if (cause.getCause () != null && cause.getCause () instanceof ConnectException) {
LOG.error ( "the server {}--serverName【{}】 maybe is shutdown ,retry it!", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
try {
if (socket != null) {
genericObjectPool.returnObject ( socket );
ifreturn = true;
} if (retryRequest)
continue;
} catch (Exception e1) {
LOG.error ( "invalidateObject error!", e1 );
}
} if (cause.getCause () != null && cause.getCause () instanceof SocketTimeoutException) {
LOG.error ( "read timeout SocketTimeoutException,retry it! {}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
if (socket != null) {
try {
genericObjectPool.invalidateObject ( socket );
ifreturn = true;
} catch (Exception e1) {
LOG.error ( "invalidateObject error ,", e );
if(transaction!=null)
transaction.setStatus ( e1 );
throw new IllegalStateException("SocketTimeout and invalidateObject error" + serverObject.getRemoteServer () + koalasClientProxy.getServiceInterface ().getName ());
}
}
if (retryRequest)
continue;
} if(cause instanceof TTransportException){
if(((TTransportException) cause).getType () == TTransportException.END_OF_FILE){
LOG.error ( "TTransportException,END_OF_FILE! {}--serverName【{}】", serverObject.getRemoteServer (),koalasClientProxy.getServiceInterface ().getName () );
if (socket != null) {
try {
genericObjectPool.invalidateObject ( socket );
} catch (Exception e1) {
LOG.error ( "invalidateObject error", e );
if(transaction!=null)
transaction.setStatus ( e1 );
throw new IllegalStateException("TTransportException and invalidateObject error" + serverObject.getRemoteServer () + koalasClientProxy.getServiceInterface ().getName ());
}
}
if(transaction!=null)
transaction.setStatus ( cause );
throw new IllegalStateException("the remote server error!" + serverObject.getRemoteServer () + koalasClientProxy.getServiceInterface ().getName ());
}
if(cause.getCause ()!=null && cause.getCause () instanceof SocketException){
if(genericObjectPool.isClosed ()){
LOG.warn ( "serverObject {} is close!,retry it",serverObject );
if (retryRequest)
continue;
}
}
} if (socket != null && !ifreturn)
genericObjectPool.returnObject ( socket );
LOG.error ( "invoke server error,server ip -【{}】,port -【{}】--serverName【{}】", serverObject.getRemoteServer ().getIp (), serverObject.getRemoteServer ().getPort (),koalasClientProxy.getServiceInterface ().getName () );
if(transaction!=null)
transaction.setStatus ( cause );
throw e;
}
}
IllegalStateException finallyException = new IllegalStateException("error!retry time out of:" + retryTimes + "!!! " + koalasClientProxy.getServiceInterface ().getName () );
if(transaction!=null)
transaction.setStatus ( finallyException );
throw finallyException;
} finally {
if(transaction!=null)
transaction.complete ();
if(serviceTop)
TraceThreadContext.remove ();
}
}
首先是线加入cat埋点,生成服务链路。这个地方先不用关注,接下来在重试循环体中来实现发送逻辑,当连接超时异常和服务端拒绝异常等异常时会进行重试
socket = genericObjectPool.borrowObject ();
获取socket连接
Object obj = koalasClientProxy.getInterfaceClientInstance ( socket, serverObject.getRemoteServer ().getServer () );
public Object getInterfaceClientInstance(TTransport socket,String server) { if (!async) {
Class<?> clazz = getSynClientClass ();
try {
if (synConstructor == null) {
synConstructor = clazz.getDeclaredConstructor ( TProtocol.class );
}
TTransport transport = new TKoalasFramedTransport ( socket, maxLength_ );
if(this.getPrivateKey ()!=null && this.getPublicKey () != null){
((TKoalasFramedTransport) transport).setRsa ( (byte) 1 );
((TKoalasFramedTransport) transport).setPrivateKey ( this.privateKey );
((TKoalasFramedTransport) transport).setPublicKey ( this.publicKey );
} TProtocol protocol = new KoalasBinaryProtocol ( transport ); return synConstructor.newInstance ( protocol ); } catch (NoSuchMethodException e) {
logger.error ( "the clazz can't find the Constructor with TProtocol.class" );
} catch (InstantiationException e) {
logger.error ( "get InstantiationException", e );
} catch (IllegalAccessException e) {
logger.error ( "get IllegalAccessException", e );
} catch (InvocationTargetException e) {
logger.error ( "get InvocationTargetException", e );
}
} else {
if (null == asyncClientManagerList) {
synchronized (this) {
if (null == asyncClientManagerList) {
asyncClientManagerList = new ArrayList<> ();
for (int i = 0; i < asyncSelectorThreadCount; i++) {
try {
asyncClientManagerList.add(new TAsyncClientManager());
} catch (IOException e) {
e.printStackTrace ();
}
}
}
}
}
Class<?> clazz = getAsyncClientClass (); if (asyncConstructor == null) {
try {
asyncConstructor = clazz.getDeclaredConstructor ( TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class );
} catch (NoSuchMethodException e) {
e.printStackTrace ();
}
} try {
return asyncConstructor.newInstance ( new KoalasBinaryProtocol.Factory (), asyncClientManagerList.get (socket.hashCode () % asyncSelectorThreadCount), socket );
} catch (InstantiationException e) {
logger.error ( "get InstantiationException", e );
} catch (IllegalAccessException e) {
logger.error ( "get IllegalAccessException", e );
} catch (InvocationTargetException e) {
logger.error ( "get InvocationTargetException", e );
} }
return null;
}
获取Thrift发送对象,也就是原生thrift代码中的xxxxx.client,它才是最终的发送对象,然后反射调用服务端,获取结果后返回给调用方,这样一个client端的同步调用逻辑就全部完成了
https://gitee.com/a1234567891/koalas-rpc
koalas-RPC 个人作品,提供大家交流学习,有意见请私信,欢迎拍砖。客户端采用thrift协议,服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,页面流量统计等,持续为个人以及中小型公司提供可靠的RPC框架技术方案
更多学习内容请加高级java QQ群:825199617