转载自http://shift-alt-ctrl.iteye.com/blog/1990030?utm_source=tuicool&utm_medium=referral
Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:
1) client端需要知道server端的IP + port,如果是分布式部署,还需要知道所有server的IP + port列表.
2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案.
3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利.
4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip + port,可以使用zookeeper来推送每个service的服务地址.
5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。
1. pom.xml
- <dependencies>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
- <!--<exclusions>-->
- <!--<exclusion>-->
- <!--<groupId>log4j</groupId>-->
- <!--<artifactId>log4j</artifactId>-->
- <!--</exclusion>-->
- <!--</exclusions>-->
- </dependency>
- <!--
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.4</version>
- </dependency>
- -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
- </dependencies>
2. spring-thrift-client.xml
其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.
- <!-- fixedAddress -->
- <!--
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="serverAddress" value="127.0.0.1:9090:2"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="10000"></property>
- </bean>
- -->
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
- <property name="connectString" value="127.0.0.1:2181"></property>
- <property name="namespace" value="demo/thrift-service"></property>
- </bean>
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="1800000"></property>
- <property name="addressProvider">
- <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">
- <property name="configPath" value="UserServiceImpl"></property>
- <property name="zookeeper" ref="thriftZookeeper"></property>
- </bean>
- </property>
- </bean>
3. ThriftServiceClientProxyFactory.java
因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".
- @SuppressWarnings("rawtypes")
- public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {
- private String service;
- private String serverAddress;
- private Integer maxActive = 32;//最大活跃连接数
- ////ms,default 3 min,链接空闲时间
- //-1,关闭空闲检测
- private Integer idleTime = 180000;
- private ThriftServerAddressProvider addressProvider;
- private Object proxyClient;
- public void setMaxActive(Integer maxActive) {
- this.maxActive = maxActive;
- }
- public void setIdleTime(Integer idleTime) {
- this.idleTime = idleTime;
- }
- public void setService(String service) {
- this.service = service;
- }
- public void setServerAddress(String serverAddress) {
- this.serverAddress = serverAddress;
- }
- public void setAddressProvider(ThriftServerAddressProvider addressProvider) {
- this.addressProvider = addressProvider;
- }
- private Class objectClass;
- private GenericObjectPool<TServiceClient> pool;
- private PoolOperationCallBack callback = new PoolOperationCallBack() {
- @Override
- public void make(TServiceClient client) {
- System.out.println("create");
- }
- @Override
- public void destroy(TServiceClient client) {
- System.out.println("destroy");
- }
- };
- @Override
- public void afterPropertiesSet() throws Exception {
- if(serverAddress != null){
- addressProvider = new FixedAddressProvider(serverAddress);
- }
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- //加载Iface接口
- objectClass = classLoader.loadClass(service + "$Iface");
- //加载Client.Factory类
- Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");
- TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
- ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = maxActive;
- poolConfig.minIdle = 0;
- poolConfig.minEvictableIdleTimeMillis = idleTime;
- poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;
- pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);
- proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- //
- TServiceClient client = pool.borrowObject();
- try{
- return method.invoke(client, args);
- }catch(Exception e){
- throw e;
- }finally{
- pool.returnObject(client);
- }
- }
- });
- }
- @Override
- public Object getObject() throws Exception {
- return proxyClient;
- }
- @Override
- public Class<?> getObjectType() {
- return objectClass;
- }
- @Override
- public boolean isSingleton() {
- return true; //To change body of implemented methods use File | Settings | File Templates.
- }
- public void close(){
- if(addressProvider != null){
- addressProvider.close();
- }
- }
- }
4. ThriftClientPoolFactory.java
"Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.
- /**
- * 连接池,thrift-client for spring
- */
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{
- private final ThriftServerAddressProvider addressProvider;
- private final TServiceClientFactory<TServiceClient> clientFactory;
- private PoolOperationCallBack callback;
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- }
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- this.callback = callback;
- }
- @Override
- public TServiceClient makeObject() throws Exception {
- InetSocketAddress address = addressProvider.selector();
- TSocket tsocket = new TSocket(address.getHostName(),address.getPort());
- TProtocol protocol = new TBinaryProtocol(tsocket);
- TServiceClient client = this.clientFactory.getClient(protocol);
- tsocket.open();
- if(callback != null){
- try{
- callback.make(client);
- }catch(Exception e){
- //
- }
- }
- return client;
- }
- public void destroyObject(TServiceClient client) throws Exception {
- if(callback != null){
- try{
- callback.destroy(client);
- }catch(Exception e){
- //
- }
- }
- TTransport pin = client.getInputProtocol().getTransport();
- pin.close();
- }
- public boolean validateObject(TServiceClient client) {
- TTransport pin = client.getInputProtocol().getTransport();
- return pin.isOpen();
- }
- static interface PoolOperationCallBack {
- //销毁client之前执行
- void destroy(TServiceClient client);
- //创建成功是执行
- void make(TServiceClient client);
- }
- }
5. DynamicAddressProvider.java
将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.
- /**
- * 可以动态获取address地址,方案设计参考
- * 1) 可以间歇性的调用一个web-service来获取地址
- * 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等)
- * 3) 可以基于zookeeper-watcher机制,获取最新地址
- * <p/>
- * 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
- * 如下实现,仅供参考
- */
- public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {
- private String configPath;
- private PathChildrenCache cachedPath;
- private CuratorFramework zookeeper;
- //用来保存当前provider所接触过的地址记录
- //当zookeeper集群故障时,可以使用trace中地址,作为"备份"
- private Set<String> trace = new HashSet<String>();
- private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
- private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
- private Object lock = new Object();
- private static final Integer DEFAULT_PRIORITY = 1;
- public void setConfigPath(String configPath) {
- this.configPath = configPath;
- }
- public void setZookeeper(CuratorFramework zookeeper) {
- this.zookeeper = zookeeper;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- //如果zk尚未启动,则启动
- if(zookeeper.getState() == CuratorFrameworkState.LATENT){
- zookeeper.start();
- }
- buildPathChildrenCache(zookeeper, configPath, true);
- cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
- }
- private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
- cachedPath = new PathChildrenCache(client, path, cacheData);
- cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- PathChildrenCacheEvent.Type eventType = event.getType();
- switch (eventType) {
- // case CONNECTION_RECONNECTED:
- //
- // break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_LOST:
- System.out.println("Connection error,waiting...");
- return;
- default:
- //
- }
- //任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
- cachedPath.rebuild();
- rebuild();
- }
- protected void rebuild() throws Exception {
- List<ChildData> children = cachedPath.getCurrentData();
- if (children == null || children.isEmpty()) {
- //有可能所有的thrift server都与zookeeper断开了链接
- //但是,有可能,thrift client与thrift server之间的网络是良好的
- //因此此处是否需要清空container,是需要多方面考虑的.
- container.clear();
- System.out.println("thrift server-cluster error....");
- return;
- }
- List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
- for (ChildData data : children) {
- String address = new String(data.getData(), "utf-8");
- current.addAll(transfer(address));
- trace.add(address);
- }
- Collections.shuffle(current);
- synchronized (lock) {
- container.clear();
- container.addAll(current);
- inner.clear();
- inner.addAll(current);
- }
- }
- });
- }
- private List<InetSocketAddress> transfer(String address){
- String[] hostname = address.split(":");
- Integer priority = DEFAULT_PRIORITY;
- if (hostname.length == 3) {
- priority = Integer.valueOf(hostname[2]);
- }
- String ip = hostname[0];
- Integer port = Integer.valueOf(hostname[1]);
- List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
- for (int i = 0; i < priority; i++) {
- result.add(new InetSocketAddress(ip, port));
- }
- return result;
- }
- @Override
- public List<InetSocketAddress> getAll() {
- return Collections.unmodifiableList(container);
- }
- @Override
- public synchronized InetSocketAddress selector() {
- if (inner.isEmpty()) {
- if(!container.isEmpty()){
- inner.addAll(container);
- }else if(!trace.isEmpty()){
- synchronized (lock) {
- for(String hostname : trace){
- container.addAll(transfer(hostname));
- }
- Collections.shuffle(container);
- inner.addAll(container);
- }
- }
- }
- return inner.poll();//null
- }
- @Override
- public void close() {
- try {
- cachedPath.close();
- zookeeper.close();
- } catch (Exception e) {
- //
- }
- }
- }
到此为止,我们的Thrift基本上就可以顺利运行起来了.更多代码,参见附件.
Thrift-server端开发与配置,参见[Thrift-server]