在上一篇介绍Apache thrift 安装和使用,写了一个简单的demo,讲解thrift服务的发布和客户端调用,但只是单向的客户端发送消息,服务端接收消息。而客户端却得不到服务器的响应。
在不涉及语言平台的制约,WebService可胜任做这些服务端的处理。
基于大部分业务需求,更需要服务端能够响应处理数据。下面我通过一个demo案例,介绍下Apache thrift 双向通信的使用。
一.首先我们还是需要安装好Apache thrift。这里不再赘述,戳这里查看我上篇文章的介绍:http://www.cnblogs.com/sumingk/articles/6073105.html
二.其次准备好thrift 所需的jar包:
三.新建一个Java web项目,编写thrift脚本,命名为student.thrift 如下:
namespace java com.zhj.student typedef i32 int typedef i16 short typedef i64 long //Student Entity struct Student { 1: string name } service Zthrift { oneway void send(1:Student msg) }
四.执行student.thrift 文件,thrift --gen java student.thrift (该文件我还是放在c盘根目录下执行),随后生产gen-java文件,如下:
五.将新生成的两文件拷入项目中,其中Student.java 是实体类,Zthrift.java是生成的类。
六.编写thrift服务端类。
package com.zhj.server; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import com.zhj.student.Student; import com.zhj.student.Zthrift; import com.zhj.student.Zthrift.Iface; public class ZServer { public static void main(String[] args){ try { TServerSocket tServerSocket=new TServerSocket(9999); TThreadPoolServer.Args targs=new TThreadPoolServer.Args(tServerSocket); TBinaryProtocol.Factory factory=new TBinaryProtocol.Factory(); //获取processFactory TProcessorFactory tProcessorFactory= getProcessorFactory(); targs.protocolFactory(factory); targs.processorFactory(tProcessorFactory); TThreadPoolServer tThreadPoolServer=new TThreadPoolServer(targs); System.out.println("start server..."); tThreadPoolServer.serve(); } catch (TTransportException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 内部类获取 getProcessorFactory * @return */ public static int tt= 0; public static TProcessorFactory getProcessorFactory(){ TProcessorFactory tProcessorFactory=new TProcessorFactory(null){ public TProcessor getProcessor(final TTransport tTransport){ Thread thread = new Thread(new Runnable() { @Override public void run() { try { System.out.println("服务端休眠5秒后,执行响应......"); //延时五秒回复(延迟执行给客户端发送消息) Thread.sleep(5000); tt +=100; System.out.println("延时五秒回复时,tt = " +tt); //这里可以把client提取作为成员变量来多次使用 Zthrift.Client client = new Zthrift.Client(new TBinaryProtocol(tTransport)); //给客户端响应消息 client.send(new Student("....test")); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); thread.start(); return new Zthrift.Processor<Iface>(new Iface() { @Override public void send(Student msg) throws TException { // TODO Auto-generated method stub tt+=10; System.out.println("接收客户端消息时,tt = " +tt); //接受客户端消息 System.out.println("....."+msg.toString()); } }); } }; return tProcessorFactory; } }
此处,内部类使用比较频繁,阅读会有些困难。Zthrift,Processor构造方法需要传入一个Iface 接口,该接口有一个接收客户端的方法send(), msg 是一个Student对象。
七.实现的客户端调用。如下:
package com.zhj.client; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import com.zhj.student.Student; import com.zhj.student.Zthrift.Iface; import com.zhj.student.Zthrift; public class ZClient { public static void main(String[]args){ final TSocket tSocket=new TSocket("127.0.0.1",9999); Zthrift.Client client=new Zthrift.Client(new TBinaryProtocol(tSocket)); try { tSocket.open(); runMethod(tSocket); //向服务端发送消息 client.send(new Student("小明1")); } catch (TTransportException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void runMethod(final TSocket tSocket){ Thread thread = new Thread(new Runnable() { @Override public void run() { Zthrift.Processor<Iface> mp = new Zthrift.Processor<Zthrift.Iface>(new Iface() { @Override public void send(Student msg) throws TException { // TODO Auto-generated method stub Long start = System.currentTimeMillis(); try { while(true){ //具体接收时间待定 if((System.currentTimeMillis()-start)>0.1*60*1000){ System.out.println("响应消息超时..."); break; } else { System.out.println("收到服务端响应消息: "+msg); } //休眠两秒 Thread.sleep(2000L); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); try { while(mp.process(new TBinaryProtocol(tSocket), new TBinaryProtocol(tSocket))){ //阻塞式方法,不需要内容 System.out.println("走阻塞式方法"); //关闭tScoket // tSocket.close(); } } catch (TException e) { System.out.println("连接已断开..."); e.printStackTrace(); } } }); thread.start(); } }
在这里,我加入了一个超时响应的死循环,用于接收服务端返回的消息,控制台可以查看服务端给的响应消息。
八.运行服务端和客户端main方法,控制台打印如下:
代码阅读有些困难,有困难或不合理之处,请小伙伴指出。Thank you!