JAVA RPC (六) 之手把手从零教你写一个生产级RPC之client的代理

时间:2023-03-08 18:51:41

首先对于RPC来讲,最主要的无非三点【SERVER IO模型】、【序列化协议】、【client连接池复用】,之前的博客大家应该对thrift有一个大致的了解了,那么我们现在来说一说如何将thrift的序列化和传输使用到生产中。先放一张作者自己写的一个rpc架构图。

JAVA RPC (六) 之手把手从零教你写一个生产级RPC之client的代理

分成几个主要部分:

1:server启动zk注册

2:client监听watch节点变动维护本地缓存,构建tcp连接池。

3:通过java aop代理获得接口代理实现,从而通过thrift序列化传输二进制自定义协议的字节流

4:server通过reactor主从模型来实现高性能服务端。

5:调用端数据上报,形成trace链路,数据大盘,TP99,可用率等。

先看一下使用方式

客户端有两种,第一种xml 使用

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:koalas="http://www.koalas.com/schema/ch"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.koalas.com/schema/ch
http://www.koalas.com/schema/ch.xsd"> <koalas:client id="koalasService"
serviceInterface="thrift.service.koalasService"
zkPath="127.0.0.1:2181"/> </beans>
package thrift.service;

import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import thrift.domain.KoalasRequest;
import thrift.domain.koalasRespone; @Service("testService")
public class TestService { @Autowired
KoalasTestService.Iface koalastestService; public void getRemoteRpc() throws TException { KoalasRequest request= new KoalasRequest ( );
request.setxxxx1 ( 1 );
request.setxxxx2( 1 );
request.setxxxxx3 ( 1 );
request.setxxxx4 ( "你好" );
request.setxxxx5 ( 1 );
KoalastestRespone respone = koalastestService.getRPC ( request);
System.out.println (respone);
}
}

第二种使用方式 注解形式

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:koalas="http://www.koalas.com/schema/ch"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.koalas.com/schema/ch
http://www.koalas.com/schema/ch.xsd"> <koalas:annotation package="thrift.annotation.client.impl"/>
</beans>
@Service("testServiceSync")
public class TestServiceSync { @KoalasClient(zkPath = "127.0.0.1:2181",readTimeout = 5000*1000)
KoalasTestService.Iface koalastestService; public void getRemoteRpc() throws TException {
KoalasRequest request= new KoalasRequest ( );
//request.setSource ( 10 );
request.setxxxxx1 ( 1 );
request.setxxxxx2 ( 1 );
request.setxxxxx3 ( 1 );
request.setxxxxx4 ( "你好啊-我是注解实现的" );
request.setxxxxx5 ( 1 );
KoalasRespone respone = koalastestService.getRPC ( request);
System.out.println (respone);
} }

简单吧,一行xml配合一个注解,client的实现这样就完成了。。。关于服务端的使用去看我git上面的wiki吧,博客中不做多说明了,这次主要讲源码。

首先看client的主要代理类入口client.proxyfactory.KoalasClientProxy,这个类的作用是所有的client服务端通过thrift的代理都通过他来完成,放出源码如下

package client.proxyfactory;

import client.cluster.ILoadBalancer;
import client.cluster.Icluster;
import client.cluster.impl.DirectClisterImpl;
import client.cluster.impl.RandomLoadBalancer;
import client.cluster.impl.ZookeeperClisterImpl;
import client.invoker.KoalsaMothodInterceptor;
import client.invoker.LocalMockInterceptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import protocol.KoalasBinaryProtocol;
import transport.TKoalasFramedTransport; import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; /**
* Copyright (C) 2018
* All rights reserved
* User: yulong.zhang
* Date:2018年09月18日17:44:58
*/
public class KoalasClientProxy implements FactoryBean<Object>, ApplicationContextAware, InitializingBean {
private final static Logger logger = LoggerFactory.getLogger ( KoalasClientProxy.class );
public static final String ASYNC_IFACE = "AsyncIface";
public static final String IFACE = "Iface";
public static final String CLIENT = "Client";
public static final String ASYNC_CLIENT = "AsyncClient";
//请求体最大长度
public static final int DEFUAL_MAXLENGTH = 10 * 1024 * 1024;
//连接超时
public static final int DEFUAL_CONNTIMEOUT = 5*1000;
//读取超时
public static final int DEFUAL_READTIMEOUT = 30*1000; //client端service
private Class<?> serviceInterface;
// 方式1:zk管理的动态集群,格式192.168.3.253:6666
private String zkPath;
// 方式2:指定的server列表,逗号分隔,#分隔权重,格式192.168.3.253:6666#10,192.168.3.253:6667#10
private String serverIpPorts; //代理对象,所有client-server类型统一代理
private Object loalsServiceProxy;
//spring上下文对象
private ApplicationContext applicationContext;
// 同步还是异步,默认同步。
private boolean async = false;
//连接超时时间
private int connTimeout=DEFUAL_CONNTIMEOUT;
//读取超时时间
private int readTimeout=DEFUAL_READTIMEOUT;
//本地client测试用实现
private String localMockServiceImpl;
//重试
private boolean retryRequest = true;
private int retryTimes = 3;
private GenericObjectPoolConfig genericObjectPoolConfig;
//最大连接数
private int maxTotal=100;
//最大闲置数
private int maxIdle=50;
//最小闲置数量
private int minIdle=10;
private boolean lifo = true;
private boolean fairness = false;
private long maxWaitMillis = 30 * 1000;
//多长时间运行一次
private long timeBetweenEvictionRunsMillis = 3 * 60 * 1000;
private long minEvictableIdleTimeMillis = 5 * 60 * 1000; //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,
//不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
private long softMinEvictableIdleTimeMillis = 10 * 60 * 1000;
private int numTestsPerEvictionRun = 20;
private boolean testOnCreate = false;
private boolean testOnBorrow = false;
private boolean testOnReturn = false;
private boolean testWhileIdle = true;
private Icluster icluster;
private ILoadBalancer iLoadBalancer;
private String env="dev";
AbandonedConfig abandonedConfig;
private boolean removeAbandonedOnBorrow = true;
private boolean removeAbandonedOnMaintenance = true;
private int removeAbandonedTimeout = 30;
private int maxLength_ = DEFUAL_MAXLENGTH;
private static int cores = Runtime.getRuntime().availableProcessors();
private int asyncSelectorThreadCount = cores * 2;
private static List<TAsyncClientManager> asyncClientManagerList = null; private String privateKey;
private String publicKey; public String getPrivateKey() {
return privateKey;
} public void setPrivateKey(String privateKey) {
this.privateKey = privateKey;
} public String getPublicKey() {
return publicKey;
} public void setPublicKey(String publicKey) {
this.publicKey = publicKey;
} public int getMaxLength_() {
return maxLength_;
} public void setMaxLength_(int maxLength_) {
this.maxLength_ = maxLength_;
} public int getMaxTotal() {
return maxTotal;
} public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
} public int getMaxIdle() {
return maxIdle;
} public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
} public int getMinIdle() {
return minIdle;
} public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
} public boolean isLifo() {
return lifo;
} public void setLifo(boolean lifo) {
this.lifo = lifo;
} public boolean isFairness() {
return fairness;
} public void setFairness(boolean fairness) {
this.fairness = fairness;
} public long getMaxWaitMillis() {
return maxWaitMillis;
} public void setMaxWaitMillis(long maxWaitMillis) {
this.maxWaitMillis = maxWaitMillis;
} public long getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
} public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
} public long getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
} public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
} public long getSoftMinEvictableIdleTimeMillis() {
return softMinEvictableIdleTimeMillis;
} public void setSoftMinEvictableIdleTimeMillis(long softMinEvictableIdleTimeMillis) {
this.softMinEvictableIdleTimeMillis = softMinEvictableIdleTimeMillis;
} public int getNumTestsPerEvictionRun() {
return numTestsPerEvictionRun;
} public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
this.numTestsPerEvictionRun = numTestsPerEvictionRun;
} public boolean isTestOnCreate() {
return testOnCreate;
} public void setTestOnCreate(boolean testOnCreate) {
this.testOnCreate = testOnCreate;
} public boolean isTestOnBorrow() {
return testOnBorrow;
} public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
} public boolean isTestOnReturn() {
return testOnReturn;
} public void setTestOnReturn(boolean testOnReturn) {
this.testOnReturn = testOnReturn;
} public boolean isTestWhileIdle() {
return testWhileIdle;
} public void setTestWhileIdle(boolean testWhileIdle) {
this.testWhileIdle = testWhileIdle;
} public String getLocalMockServiceImpl() {
return localMockServiceImpl;
} public void setLocalMockServiceImpl(String localMockServiceImpl) {
this.localMockServiceImpl = localMockServiceImpl;
} public int getReadTimeout() {
return readTimeout;
} public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
} public int getConnTimeout() {
return connTimeout;
} public void setConnTimeout(int connTimeout) {
this.connTimeout = connTimeout;
} public String getZkPath() {
return zkPath;
} public void setZkPath(String zkPath) {
this.zkPath = zkPath;
} public String getServerIpPorts() {
return serverIpPorts;
} public void setServerIpPorts(String serverIpPorts) {
this.serverIpPorts = serverIpPorts;
} public boolean isAsync() {
return async;
} public void setAsync(boolean async) {
this.async = async;
} public Object getLoalsServiceProxy() {
return loalsServiceProxy;
} public void setLoalsServiceProxy(Object loalsServiceProxy) {
this.loalsServiceProxy = loalsServiceProxy;
} public ApplicationContext getApplicationContext() {
return applicationContext;
} public Class<?> getServiceInterface() {
return serviceInterface;
} public void setServiceInterface(Class<?> serviceInterface) {
this.serviceInterface = serviceInterface;
} public ILoadBalancer getiLoadBalancer() {
return iLoadBalancer;
} public void setiLoadBalancer(ILoadBalancer iLoadBalancer) {
this.iLoadBalancer = iLoadBalancer;
} public boolean isRemoveAbandonedOnBorrow() {
return removeAbandonedOnBorrow;
} public void setRemoveAbandonedOnBorrow(boolean removeAbandonedOnBorrow) {
this.removeAbandonedOnBorrow = removeAbandonedOnBorrow;
} public boolean isRemoveAbandonedOnMaintenance() {
return removeAbandonedOnMaintenance;
} public void setRemoveAbandonedOnMaintenance(boolean removeAbandonedOnMaintenance) {
this.removeAbandonedOnMaintenance = removeAbandonedOnMaintenance;
} public int getRemoveAbandonedTimeout() {
return removeAbandonedTimeout;
} public void setRemoveAbandonedTimeout(int removeAbandonedTimeout) {
this.removeAbandonedTimeout = removeAbandonedTimeout;
} public boolean isRetryRequest() {
return retryRequest;
} public void setRetryRequest(boolean retryRequest) {
this.retryRequest = retryRequest;
} public int getRetryTimes() {
return retryTimes;
} public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
} public String getEnv() {
return env;
}
public void setEnv(String env) {
this.env = env;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
} @Override
public Object getObject(){
if (getLoalsServiceProxy () == null) throw new RuntimeException ( "the Proxy can't be null" );
return getLoalsServiceProxy ();
} @Override
public Class<?> getObjectType() {
if (serviceInterface == null)
return null;
return getIfaceInterface ();
} private Class<?> getIfaceInterface() {
if (async)
return getAsyncIfaceInterface ();
else
return getSynIfaceInterface ();
} private Constructor<?> synConstructor;
private Constructor<?> asyncConstructor; public Object getInterfaceClientInstance(TTransport socket,String server) { if (!async) {
Class<?> clazz = getSynClientClass ();
try {
if (synConstructor == null) {
synConstructor = clazz.getDeclaredConstructor ( TProtocol.class );
}
TTransport transport = new TKoalasFramedTransport ( socket, maxLength_ );
if(this.getPrivateKey ()!=null && this.getPublicKey () != null){
((TKoalasFramedTransport) transport).setRsa ( (byte) 1 );
((TKoalasFramedTransport) transport).setPrivateKey ( this.privateKey );
((TKoalasFramedTransport) transport).setPublicKey ( this.publicKey );
} TProtocol protocol = new KoalasBinaryProtocol ( transport ); return synConstructor.newInstance ( protocol ); } catch (NoSuchMethodException e) {
logger.error ( "the clazz can't find the Constructor with TProtocol.class" );
} catch (InstantiationException e) {
logger.error ( "get InstantiationException", e );
} catch (IllegalAccessException e) {
logger.error ( "get IllegalAccessException", e );
} catch (InvocationTargetException e) {
logger.error ( "get InvocationTargetException", e );
}
} else {
if (null == asyncClientManagerList) {
synchronized (this) {
if (null == asyncClientManagerList) {
asyncClientManagerList = new ArrayList<> ();
for (int i = 0; i < asyncSelectorThreadCount; i++) {
try {
asyncClientManagerList.add(new TAsyncClientManager());
} catch (IOException e) {
e.printStackTrace ();
}
}
}
}
}
Class<?> clazz = getAsyncClientClass (); if (asyncConstructor == null) {
try {
asyncConstructor = clazz.getDeclaredConstructor ( TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class );
} catch (NoSuchMethodException e) {
e.printStackTrace ();
}
} try {
return asyncConstructor.newInstance ( new KoalasBinaryProtocol.Factory (), asyncClientManagerList.get (socket.hashCode () % asyncSelectorThreadCount), socket );
} catch (InstantiationException e) {
logger.error ( "get InstantiationException", e );
} catch (IllegalAccessException e) {
logger.error ( "get IllegalAccessException", e );
} catch (InvocationTargetException e) {
logger.error ( "get InvocationTargetException", e );
} }
return null;
} private Class<?> getAsyncIfaceInterface() {
Class<?>[] classes = serviceInterface.getClasses ();
for (Class c : classes)
if (c.isMemberClass () && c.isInterface () && c.getSimpleName ().equals ( ASYNC_IFACE )) {
return c;
}
throw new IllegalArgumentException ( "can't find the interface AsyncIface,please make the service with thrift tools!" );
} private Class<?> getSynIfaceInterface() {
Class<?>[] classes = serviceInterface.getClasses ();
for (Class c : classes)
if (c.isMemberClass () && c.isInterface () && c.getSimpleName ().equals ( IFACE )) {
return c;
}
throw new IllegalArgumentException ( "can't find the interface Iface,please make the service with thrift tools" );
} private Class<?> getSynClientClass() {
Class<?>[] classes = serviceInterface.getClasses ();
for (Class c : classes)
if (c.isMemberClass () && !c.isInterface () && c.getSimpleName ().equals ( CLIENT )) {
return c;
}
throw new IllegalArgumentException ( "serviceInterface must contain Sub Class of Client" );
} private Class<?> getAsyncClientClass() {
Class<?>[] classes = serviceInterface.getClasses ();
for (Class c : classes)
if (c.isMemberClass () && !c.isInterface () && c.getSimpleName ().equals ( ASYNC_CLIENT )) {
return c;
}
throw new IllegalArgumentException ( "serviceInterface must contain Sub Class of AsyncClient" );
} @Override
public boolean isSingleton() {
return true;
} @Override
public void afterPropertiesSet(){ if(serviceInterface==null){
throw new IllegalArgumentException ( "serviceInterface can't be null" );
} if(zkPath==null && serverIpPorts==null){
throw new IllegalArgumentException ( "zkPath or serverIpPorts at least ones can't be null" );
} Class<?> _interface = null;
if (localMockServiceImpl != null && !StringUtils.isEmpty ( localMockServiceImpl.trim () )) {
LocalMockInterceptor localMockInterceptor = new LocalMockInterceptor ( localMockServiceImpl );
_interface = getIfaceInterface ();
ProxyFactory pf = new ProxyFactory ( _interface, localMockInterceptor );
setLoalsServiceProxy ( pf.getProxy () );
return;
} genericObjectPoolConfig = getGenericObjectPoolConfig ();
abandonedConfig = getAbandonedConfig (); if (!StringUtils.isEmpty ( serverIpPorts )) {
icluster = new DirectClisterImpl ( serverIpPorts, iLoadBalancer == null ? new RandomLoadBalancer () : iLoadBalancer, serviceInterface.getName (), async, connTimeout, readTimeout, genericObjectPoolConfig, abandonedConfig );
} else{
icluster = new ZookeeperClisterImpl ( zkPath ,iLoadBalancer == null ? new RandomLoadBalancer () : iLoadBalancer, serviceInterface.getName (),env,async,connTimeout,readTimeout,genericObjectPoolConfig,abandonedConfig);
} KoalsaMothodInterceptor koalsaMothodInterceptor = new KoalsaMothodInterceptor ( icluster, retryTimes, retryRequest, this,readTimeout );
_interface = getIfaceInterface (); loalsServiceProxy = new ProxyFactory ( _interface, koalsaMothodInterceptor ).getProxy (); logger.info ( "the service【{}】is start !", serviceInterface.getName () );
} private AbandonedConfig getAbandonedConfig() {
AbandonedConfig abandonedConfig = new AbandonedConfig ();
abandonedConfig.setRemoveAbandonedOnBorrow ( isRemoveAbandonedOnBorrow () );
abandonedConfig.setRemoveAbandonedOnMaintenance ( isRemoveAbandonedOnMaintenance () );
abandonedConfig.setRemoveAbandonedTimeout ( getRemoveAbandonedTimeout () );
return abandonedConfig;
} private GenericObjectPoolConfig getGenericObjectPoolConfig() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig ();
genericObjectPoolConfig.setMaxTotal ( getMaxTotal () );
genericObjectPoolConfig.setMinIdle ( getMinIdle () );
genericObjectPoolConfig.setMaxIdle ( maxIdle );
genericObjectPoolConfig.setMaxWaitMillis ( getMaxWaitMillis () );
genericObjectPoolConfig.setLifo ( isLifo () );
genericObjectPoolConfig.setFairness ( isFairness () );
genericObjectPoolConfig.setMinEvictableIdleTimeMillis ( getMinEvictableIdleTimeMillis () );
genericObjectPoolConfig.setSoftMinEvictableIdleTimeMillis ( getSoftMinEvictableIdleTimeMillis () );
genericObjectPoolConfig.setNumTestsPerEvictionRun ( getNumTestsPerEvictionRun () );
genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis ( getTimeBetweenEvictionRunsMillis () );
genericObjectPoolConfig.setTestOnCreate ( isTestOnCreate () );
genericObjectPoolConfig.setTestOnBorrow ( isTestOnBorrow () );
genericObjectPoolConfig.setTestOnReturn ( isTestOnReturn () );
genericObjectPoolConfig.setTestWhileIdle ( isTestWhileIdle () );
return genericObjectPoolConfig;
} public void destroy(){
if(icluster!= null) icluster.destroy ();
} public static void main(String[] args) {
String a = "192.168.3.253:6666#10#thrift,192.168.3.253:6667#10#netty";
System.out.println ( Arrays.toString ( a.split ( "[^0-9a-zA-Z_\\-\\.:#]+" ) ) );
}
}

首先这个类实现了FactoryBean和InitializingBean,FactoryBean的实现方法getObject就是这个代理类本身的实现了,返回的对象为全局的setKoalasServiceProxy,注意一下这块的代码

        koalasServiceProxy = new ProxyFactory ( _interface, koalsaMothodInterceptor ).getProxy ();

可以看出koalasServiceProxy为spring代理出来的对象,代理的接口是interface,也就是我们thrift自动生成class的 xxxxxx.iface

private Class<?> getSynIfaceInterface() {
Class<?>[] classes = serviceInterface.getClasses ();
for (Class c : classes)
if (c.isMemberClass () && c.isInterface () && c.getSimpleName ().equals ( IFACE )) {
return c;
}
throw new IllegalArgumentException ( "can't find the interface Iface,please make the service with thrift tools" );
}

这也就说为什么在我们的spring bean注入的类为 xxxxx.iface,对spring有一些知识的朋友一定会了解。(不懂的去学呗)

至于反射的实现肯定是在

        KoalsaMothodInterceptor koalsaMothodInterceptor = new KoalsaMothodInterceptor ( icluster, retryTimes, retryRequest, this,readTimeout );

里面了,里面的东西是什么,敬请期待下回分解!

https://gitee.com/a1234567891/koalas-rpc

koalas-RPC 个人作品,提供大家交流学习,有意见请私信,欢迎拍砖。客户端采用thrift协议,服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,页面流量统计等,持续为个人以及中小型公司提供可靠的RPC框架技术方案

更多学习内容请加高级java QQ群:825199617