1. 什么是Thrift?
Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目。Thrift通过IDL(Interface Definition Language,接口定义语言,使得该框架支持跨语言调用)来定义RPC(Remote Procedure Call,远程过程调用)的接口和数据类型,然后通过thrift编译器生成不同语言的代码(目前支持C++,Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现。
Thrift 是完全静态化的,当数据结构发生变化时,必须重新编辑IDL文件、代码生成再编译载入的流程,跟其他IDL工具相比较可以视为是 Thrift 的弱项。Thrift 适用于搭建大型数据交换及存储的通用工具,在大型系统中的内部数据传输上相对于 JSON 和 XML 无论在性能、传输大小上有明显的优势。
Thrift 不仅仅是个高效的序列化工具,它是一个完整的 RPC 框架体系!
2. RPC框架
RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。简单来说,就是客户端在不必知道调用细节的前提之下,调用远程计算机上运行的某个对象,使用起来就像调用本地的对象一样。
目前典型的RPC实现框架有:Thrift(facebook开源)、Dubbo(alibaba开源)等等。
衡量RPC框架性能的标准:
1. 网络I/O模型:RPC服务器,可以考虑支持阻塞式同步IO、非阻塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支持不同的网络IO模型,在高并发的状态下,处理性能上会有很大的差别。RPC框架针对网络协议、网络I/O模型的封装是透明的,对于调用的客户端而言,它就认为自己在调用本地的一个对象。
2. 传输协议:是基于TCP协议、还是HTTP协议、还是UDP协议?对性能也有一定的影响。大多数RPC开源实现框架都是基于TCP、或者HTTP的,目测没有采用UDP协议做为主要的传输协议的。
RPC本质上是一种 Inter-process communication(IPC)——进程间通信的形式。常见的进程间通信方式如管道、共享内存是同一台物理机上的两个进程间的通信,而RPC就是两个在不同物理机上的进程之间的通信。概括的说,RPC就是在一台机器上调用另一台机器上的方法,这种调用在远程机器上对代码的执行就像在本机上对代码的执行一样,只是迁移了一个执行环境而已。
RPC是一种C/S架构的服务模型,server端提供接口供client调用,client端向server端发送数据,server端接收client端的数据进行相关计算并将结果返回给client端。
执行一次RPC的步骤(1~6)如下图所示:
3. thrift的协议栈结构
在Client和Server的最顶层都是用户自定义的处理逻辑,也就是说用户只需要编写用户逻辑,就可以完成整套的RPC调用流程。
用户逻辑的下一层是Thrift自动生成的代码,这些代码主要用于结构化数据的解析,发送和接收,同时服务器端的自动生成代码中还包含了RPC请求的转发(Client的A调用转发到Server A函数进行处理)。
协议栈的其他模块都是Thrift的运行时模块:
-
底层IO模块:负责实际的数据传输,包括Socket,文件,或者压缩数据流等。
-
TTransport:负责以字节流方式发送和接收Message,是底层IO模块在Thrift框架中的实现,每一个底层IO模块都会有一个对应TTransport来负责Thrift的字节流(Byte Stream)数据在该IO模块上的传输。例如TSocket对应Socket传输,TFileTransport对应文件传输。
-
TProtocol:主要负责将结构化数据组装成Message,或者从Message结构中读出结构化数据。TProtocol将一个有类型的数据转化为字节流以交给TTransport进行传输,或者从TTransport中读取一定长度的字节数据转化为特定类型的数据。如int32会被TBinaryProtocol Encode为一个四字节的字节数据,或者TBinaryProtocol从TTransport中取出四个字节的数据Decode为int32。
-
TServer:负责接收Client的请求,并将请求转发到Processor进行处理。TServer主要任务就是高效的接受Client的请求,特别是在高并发请求的情况下快速完成请求。
-
Processor(或者TProcessor):负责对Client的请求做出相应,包括RPC请求转发,调用参数解析和用户逻辑调用,返回值写回等处理步骤。Processor是服务器端从Thrift框架转入用户逻辑的关键流程。Processor同时也负责向Message结构中写入数据或者读出数据。
利用Thrift用户只需要做三件事:
(1). 利用IDL定义数据结构及服务
(2). 利用代码生成工具将(1)中的IDL编译成对应语言(如C++、JAVA),编译后得到基本的框架代码
(3). 在(2)中框架代码基础上完成完整代码(纯C++代码、JAVA代码等)
为了实现上述RPC协议栈,Thrift定义了一套IDL,封装了server相关类, processor相关类,transport相关类,protocol相关类以及并发和时钟管理方面的库。
4. Thrift的数据类型
Thrift类型系统没有引入任何特殊的动态类型或包装器对象,也不要求开发者编写任何对象序列化或传输的代码。Thrift IDL文件在逻辑上,是开发者对他们的数据结构进行注解的一种方法,该方法告诉代码生成器怎样在语言之间安全传输对象,所需的额外信息量最小。
1)基本类型
bool 布尔值,true或false
byte 有符号字节
i16 16位有符号整数
i32 32位有符号整数
i64 64位有符号整数
double 64位浮点数
string 与编码无关的文本或二进制字符串
2)structs(结构体)
定义一个Thrift结构体的基本语法与C结构体定义非常相似。域可由一个整型域标识符(在该结构体的作用域内是唯一的),以及可选的默认值来标注。
struct Phone {
1: i32 id,
2: string number,
3: PhoneType type
}
3)enum(枚举)
enum Operation {
ADD = 1,
SUBTRACT = 2,
MULTIPLY = 3,
DIVIDE = 4
}
4)Containers(容器)
Thrift容器是强类型的,映射为通用编程语言中最常使用的容器。使用C++模板类来标注。有三种可用类型:
list<type>:映射为STL vector,Java ArrayList,或脚本语言中的native array。。
set<type>: 映射为为STL set,Java HashSet,Python中的set,或PHP/Ruby中的native dictionary。
Map<type1,type2>:映射为STL map,Java HashMap,PHP associative array,或Python/Ruby dictionary。
在目标语言中,定义将产生有read和write两种方法的类型,使用Thrift TProtocol对象对对象进行序列化和传输。
5)Exceptions(异常)
异常在语法和功能上都与结构体相同,唯一的区别是它们使用exception关键词,而非struct关键词进行声明。 生成的对象继承自各目标编程语言中适当的异常基类,以便与任何给定语言中的本地异常处理无缝地整合。
exception InvalidOperation {
1: i32 whatOp,
2: string why
}
6)Services(服务)
使用Thrift类型定义服务。对一个服务的定义在语法上等同于在面向对象编程中定义一个接口(或一个纯虚抽象类)。Thrift编译器生成实现该接口的客户与服务器存根。服务的定义如下:
service <name> {
<returntype> <name>(<arguments>)
[throws (<exceptions>)]
...
}例如:
service StringCache {
void set(1:i32 key, 2:string value),
string get(1:i32 key) throws (1:KeyNotFound knf),
void delete(1:i32 key)
}
注意: 除其他所有定义的Thrift类型外,void也是一个有效的函数返回类型。void函数可添加一个async修饰符,产生的代码不等待服务器的应答。 一个纯void函数会向客户端返回一个应答,保证服务器一侧操作已完成。应用开发者应小心,仅当方法调用失败是可以接受的,或传输层已知可靠的情况下,才使用async优化。
5. TServer
Thrift核心库提供一个TServer抽象类。
Tserver继承关系图:
Tthread关系图:
TServer在Thrift框架中的主要任务是接收Client的请求,并转到某个TProcessor上进行请求处理。针对不同的访问规模,Thrift提供了不同的TServer模型。Thrift目前支持的Server模型包括:
1. TSimpleServer:使用阻塞IO的单线程服务器,主要用于调试
2. TThreadedServer:使用阻塞IO的多线程服务器。每一个请求都在一个线程里处理,并发访问情况下会有很多线程同时在运行。
3. TThreadPoolServer:使用阻塞IO的多线程服务器,使用线程池管理处理线程。
4. TNonBlockingServer:使用非阻塞IO的多线程服务器,使用少量线程既可以完成大并发量的请求响应,必须使用TFramedTransport。
Thrift 使用 libevent 作为服务的事件驱动器, libevent 其实就是 epoll更高级的封装而已(在linux下是epoll)。处理大量更新的话,主要是在TThreadedServer和TNonblockingServer中进行选择。TNonblockingServer能够使用少量线程处理大量并发连接,但是延迟较高;TThreadedServer的延迟较低。实际中,TThreadedServer的吞吐量可能会比TNonblockingServer高,但是TThreadedServer的CPU占用要比TNonblockingServer高很多。
TServer对象通常做如下工作:
1) 使用TServerTransport获得一个TTransport
2) 使用TTransportFactory,可选地将原始传输转换为一个适合的应用传输(典型的是使用TBufferedTransportFactory)
3) 使用TProtocolFactory,为TTransport创建一个输入和输出
4) 调用TProcessor对象的process()方法
Thrift中定义一个server的方法如下:
TSimpleServer server(
boost::make_shared<CalculatorProcessor>(boost::make_shared<CalculatorHandler>()),
boost::make_shared<TServerSocket>(9090),
boost::make_shared<TBufferedTransportFactory>(),
boost::make_shared<TBinaryProtocolFactory>());TThreadedServer server(
boost::make_shared<CalculatorProcessorFactory> (boost::make_shared<CalculatorCloneFactory>()),
boost::make_shared<TServerSocket>(9090), //port
boost::make_shared<TBufferedTransportFactory>(),
boost::make_shared<TBinaryProtocolFactory>());const int workerCount = 4;//线程池容量
boost::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
threadManager->threadFactory(boost::make_shared<PlatformThreadFactory>());
threadManager->start();TThreadPoolServer server(
boost::make_shared<CalculatorProcessorFactory>(boost::make_shared<CalculatorCloneFactory>()),
boost::make_shared<TServerSocket>(9090),
boost::make_shared<TBufferedTransportFactory>(),
boost::make_shared<TBinaryProtocolFactory>(),
threadManager);TNonBlockingServer server(
boost::make_shared<CalculatorProcessorFactory>(boost::make_shared<CalculatorCloneFactory>()),
boost::make_shared<TServerSocket>(9090),
boost::make_shared<TFramedTransportFactory>(),
boost::make_shared<TBinaryProtocolFactory>(),
threadManager);server.serve();//启动server
5.1 TSimpleServer工作模式
5.2 TThreadPoolServer工作模式
TThreadPoolServer模式优点:
线程池模式中,数据读取和业务处理都交由线程池完成,主线程只负责监听新连接,因此在并发量较大时新连接也能够被及时接受。线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。
TThreadPoolServer模式缺点:
线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。
5.3 TNonblockingServer模式
TNonblockingServer模式优点:
相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,同时监控多个socket的状态变化;
TNonblockingServer模式缺点:
TNonblockingServer模式在业务处理上还是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。
5.4 THsHaServer模式(半同步半异步)
THsHaServer的优点:
与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升;
THsHaServer的缺点:
主线程需要完成对所有socket的监听以及数据读写的工作,当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受。
5.5 TThreadedSelectorServer模式
TThreadedSelectorServer模式是目前Thrift提供的*的模式,它内部有如果几个部分构成:
(1) 一个AcceptThread线程对象,专门用于处理监听socket上的新连接;
(2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是由这些线程来完成;
(3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
(4) 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取之后,交个ExecutorService线程池中的线程完成此次调用的具体执行;
TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况;TThreadedSelectorServer对于大部分应用场景性能都不会差,因此,如果实在不知道选择哪种工作模式,使用TThreadedSelectorServer就可以。
6. TTransport
TTransport是与底层数据传输紧密相关的传输层。每一种支持的底层传输方式都存在一个与之对应的TTransport。在TTransport这一层,数据是按字节流(Byte Stream)方式处理的,即传输层看到的是一个又一个的字节,并把这些字节按照顺序发送和接收。TTransport并不了解它所传输的数据是什么类型,实际上传输层也不关心数据是什么类型,只需要按照字节方式对数据进行发送和接收即可。数据类型的解析在TProtocol这一层完成。
TTransport具体的有以下几个类:
TSocket:使用阻塞的TCP Socket进行数据传输,也是最常见的模式
THttpTransport:采用Http传输协议进行数据传输
TFileTransport:文件(日志)传输类,允许client将文件传给server,允许server将收到的数据写到文件中
TZlibTransport:与其他的TTransport配合使用,压缩后对数据进行传输,或者将收到的数据解压下面几个类主要是对上面几个类地装饰(采用了装饰模式),以提高传输效率。
TBufferedTransport:对某个Transport对象操作的数据进行buffer,即从buffer中读取数据进行传输,或者将数据直接写入buffer
TFramedTransport:同TBufferedTransport类似,也会对相关数据进行buffer,同时,它支持定长数据发送和接收(按块的大小,进行传输)。
TMemoryBuffer:从一个缓冲区中读写数据
Thrift实现中,一个关键的设计选择就是将传输层从代码生成层解耦。从根本上,生成的Thrift代码只需要知道如何读和写数据。数据的源和目的地无关紧要,可以是一个socket,一段共享内存,或本地磁盘上的一个文件。TTransport(Thrift transport)接口支持以下方法:
open Opens the tranpsort
close Closes the tranport
isOpen Indicates whether the transport is open
read Reads from the transport
write Writes to the transport
flush Forces any pending writes
除以上的TTransport接口外,还有一个TServerTransport接口,用来接受或创建原始传输对象。它的接口如下:
open Opens the transport
listen Begins listening for connections
accept Returns a new client transport
close Closes the transport
7. TProtocol
TProtocol的主要任务是把TTransport中的字节流转化为数据流(Data Stream),在TProtocol这一层就会出现具有数据类型的数据,如整型,浮点数,字符串,结构体等。TProtocol中数据虽然有了数据类型,但是TProtocol只会按照指定类型将数据读出和写入,而对于数据的真正用途,需要在Thrift自动生成的Server和Client中里处理。
Thrift 可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本 (text) 和二进制 (binary) 传输协议,为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数。常用协议有以下几种:
TBinaryProtocol: 二进制格式
TCompactProtocol: 高效率的、密集的二进制编码格式
TJSONProtocol: 使用 JSON 的数据编码协议进行数据传输
TSimpleJSONProtocol: 提供JSON只写协议, 生成的文件很容易通过脚本语言解析。
TDebugProtocol: 使用易懂的可读的文本格式,以便于debug
TCompactProtocol 高效的编码方式,使用了类似于ProtocolBuffer的Variable-Length Quantity (VLQ) 编码方式,主要思路是对整数采用可变长度,同时尽量利用没有使用Bit。对于一个int32并不保证一定是4个字节编码,实际中可能是1个字节,也可能是5个字节,但最多是五个字节。TCompactProtocol并不保证一定是最优的,但多数情况下都会比TBinaryProtocol性能要更好。
TProtocol接口非常直接,它根本上支持两件事: 1) 双向有序的消息传递; 2) 基本类型、容器及结构体的编码。
writeMessageBegin(name, type, seq)
writeMessageEnd()
writeStructBegin(name)
writeStructEnd()
writeFieldBegin(name, type, id)
writeFieldEnd()
writeFieldStop()
writeMapBegin(ktype, vtype, size)
writeMapEnd()
writeListBegin(etype, size)
writeListEnd()
writeSetBegin(etype, size)
writeSetEnd()
writeBool(bool)
writeByte(byte)
writeI16(i16)
writeI32(i32)
writeI64(i64)
writeDouble(double)
writeString(string)
name, type, seq = readMessageBegin()
readMessageEnd()
name = readStructBegin()
readStructEnd()
name, type, id = readFieldBegin()
readFieldEnd()
k, v, size = readMapBegin()
readMapEnd()
etype, size = readListBegin()
readListEnd()
etype, size = readSetBegin()
readSetEnd()
bool = readBool()
byte = readByte()
i16 = readI16()
i32 = readI32()
i64 = readI64()
double = readDouble()
string = readString()
注意到每个write函数有且仅有一个相应的read方法。WriteFieldStop()是一个特殊的方法,标志一个结构的结束。读一个结构的过程是readFieldBegin()直到遇到stop域,然后readStructEnd()。生成的代码依靠这个调用顺序,来确保一个协议编码器所写的每一件事,都可被一个相应的协议解码器读取。 这组功能在设计上更加注重健壮性,而非必要性。例如,writeStructEnd()不是严格必需的,因为一个结构体的结束可用stop域表示。
8. TProcessor/Processor
Processor是由Thrift生成的TProcessor的子类,主要对TServer中一次请求的 InputProtocol和OutputTProtocol进行操作,也就是从InputProtocol中读出Client的请求数据,向OutputProtcol中写入用户逻辑的返回值。Processor是TServer从Thrift框架转到用户逻辑的关键流程。同时TProcessor.process是一个非常关键的处理函数,因为Client所有的RPC调用都会经过该函数处理并转发。
TProcessor对于一次RPC调用的处理过程可以概括为:
TServer接收到RPC请求之后,调用TProcessor.process进行处理。
TProcessor.process首先调用TTransport.readMessageBegin接口,读出RPC调用的名称和RPC调用类型。如果RPC调用类型是RPC Call,则调用TProcessor.process_fn继续处理,对于未知的RPC调用类型,则抛出异常。
TProcessor.process_fn根据RPC调用名称到自己的processMap中查找对应的RPC处理函数。如果存在对应的RPC处理函数,则调用该处理函数继续进行请求响应。不存在则抛出异常。
a) 在这一步调用的处理函数,并不是最终的用户逻辑。而是对用户逻辑的一个包装。
b) processMap是一个标准的std::map。Key为RPC名称。Value是对应的RPC处理函数的函数指针。 processMap的初始化是在Processor初始化的时候进行的。Thrift虽然没有提供对processMap做修改的API,但是仍可以通过继承TProcessor来实现运行时对processMap进行修改,以达到打开或关闭某些RPC调用的目的。
RPC处理函数是RPC请求处理的最后一个步骤,它主要完成以下三个步骤:
a) 调用RPC请求参数的解析类,从TProtocol中读入数据完成参数解析。不管RPC调用的参数有多少个,Thrift都会将参数放到一个Struct中去。Thrift会检查读出参数的字段ID和字段类型是否与要求的参数匹配。对于不符合要求的参数都会跳过。这样,RPC接口发生变化之后,旧的处理函数在不做修改的情况,可以通过跳过不认识的参数,来继续提供服务。进而在RPC框架中提供了接口的多Version支持。
b) 参数解析完成之后,调用用户逻辑,完成真正的请求响应。
c) 用户逻辑的返回值使用返回值打包类进行打包,写入TProtocol。
9. ThriftClient
ThriftClient跟TProcessor一样都主要操作InputProtocol和OutputProtocol,不同的是ThritClient将RPC调用分为Send和receive两个步骤。
Send步骤,将用户的调用参数作为一个整体的Struct写入TProcotol,并发送到TServer。
Send结束之后,ThriftClient便立刻进入Receive状态等待TServer的相应。对于TServer返回的响应,使用返回值解析类进行返回值解析,完成RPC调用。
10. Thrift RPC Version
Thrift的RPC接口支持不同Version之间的兼容性。需要注意的是:
1. 不要修改已经存在数据的字段编号
2. 新加的字段必须是optional的。以保证新生成的代码可以序列旧的Message。同时尽量为新加的字段添加默认值。
3. Required字段不能被删除。可以字段前加上"OBSOLETE_"来提醒后续用户该字段已经不再使用,同时字段编号不能复用。
4. 修改默认值对Version控制没有影响。因为默认值不会被传输,而是由数据的接收者来决定。
12. Thrift Generator
Thrift自动生成代码的代码框架被直接HardCode到了代码生成器里,因此对生成代码的结构进行修改需要重新编译Thrift,并不是十分方便。如果Thrift将代码结构保存到一个模板文件里,修改生成代码就会相对容易一些。
自动生成的代码就会遵守一定的命名规则。Thrift中几种主要的命名规则为:
1. IDLName + ”_types.h” :用户自定义数据类型头文件
2. IDLName + ”_constants.h” :用户自定义的枚举和常量数据类型头文件
3. ServiceName + “.h” :Server端Processor定义和Client定义头文件
4. ServericeName + ”_” + RPC名称 + “_args” :服务器端RPC参数解析类
5. ServericeName + ”_” + RPC名称 + “_result” :服务器端RPC返回值打包类
6. ServericeName + ”_” + RPC名称 + “_pargs” :客户端RPC参数打包类
7. ServericeName + ”_” + RPC名称 + “_presult” :客户端RPC返回值解析类
8. “process_” + RPC名称:服务器端RPC调用处理函数
9. “send_” + RPC名称:客户端发送RPC请求的方法
10. “recv_” + RPC名称:客户端接收RPC返回的方法
客户端和服务器的参数解析和返回值解析虽然针对的是同样的数据结构,但是Thrift并没有使用同一个类来完成任务,而是将客户端和服务器的解析类分开。
13. 版本化(Versioning)
Thrift面对版本化和数据定义的改变是健壮的。将阶段性的改变推送到已部署的服务中的能力至关重要。系统必须能够支持从日志文件中读取旧数据,以及过时的客户(服务器)向新的服务器(客户)发送的请求。
Field Identifiers(域标识符)
Thrift的版本化通过域标识符实现。Thrift中,一个结构体的每一个成员的域头都用一个唯一的域标识符编码。域标识符和类型说明符结合起来,唯一地标志该域。Thrift定义语言支持域标识符的自动分配,但最好始终显式地指定域标识符。标识符的指定如下所示:
struct Example {
1:i32 number=10,
2:i64 bigNumber,
3:double decimals,
4:string name="thrifty"
}
为避免手动和自动分配的标识符之间的冲突,省略了标识符的域所赋的标识符从-1开始递减,本语言对正的标识符仅支持手动赋值。
函数参数列表里能够、并且应当指定域标识符。事实上,参数列表不仅在后端表现为结构,实际上在编译器前端也表现为与结构体同样的代码。这允许我们对方法参数进行版本安全的修改。
service StringCache {
void set(1:i32 key, 2:string value),
string get(1:i32 key) throws (1:KeyNotFound knf),
void delete(1:i32 key)
}
Isset
如果遇到了一个预料之外的域,它可被安全地忽视并丢弃。当一个预期的域找不到时,必须有某些方法告诉开发者该域不在。这是通过定义的对象内部的一个isset结构实现的。(Isset功能在PHP里默认为null,Python里为None,Ruby里为nil)。 各个Thrift结构内部的isset对象为各个域包含一个布尔值,表示该域在结构中是否存在。接收一个结构时,应当在直接对其进行操作之前,先检查一个域是否已设置(being set)。
class Example {
public:
Example() :
number(10),
bigNumber(0),
decimals(0),
name("thrifty") {}int32_t number;
int64_t bigNumber;
double decimals;
std::string name;struct __isset {
__isset() : number(false), bigNumber(false), decimals(false), name(false) {};
bool number;
bool bigNumber;
bool decimals;
bool name;
} __isset;
...
}
Case Analysis(案例分析)
有四种可能发生版本不匹配的情况:
1.已添加字段,旧客户端,新服务器:旧客户端没有发送新字段,新服务器识别到那个新字段未置位,执行对于旧数据请求的默认操作。
2.已删除字段,旧客户端,新服务器:旧客户端发送已被删除的字段,新服务器简单忽略这个字段。
3.已添加字段,新客户端,旧服务器:新客户端发送了旧服务器不能识别的字段,旧服务器简单的忽略,并按正常请求处理。
4.已删除字段,新客户端,旧服务器:最危险的情况,对于丢失的字段,旧服务器不大可能有默认的合适动作执行。这种情况,推荐在升级客户端之前升级服务器端。
实现细节
Target Languages(目标语言):Thrift当前支持五种目标语言:C++,Java,Python,Ruby和PHP。
Servers and Multithreading(服务器和多线程):为处理来自多个客户机的同时的请求,Thrift服务要求基本的多线程。Thrift实现了自己的多线程运行时库:
ThreadManager:ThreadManager负责管理工作线程池,一旦有空闲的线程,应用就可以调度任务来执行。ThreadManager并未实现动态线程池大小的调整,但提供了原语,以便应用能基于负载添加和移除线程。Thrift把复杂的API抽象留给特定应用,提供原语以制定所期望的策略,并对当前状态进行采样。
TimerManager:允许应用在未来某个时间点调度Runnable对象以执行。它具体的工作是允许应用定期对ThreadManager的负载进行抽样,并根据应用的方针使线程池大小发生改变。TimerManager也能用于生成任意数量的定时器或告警事件。 TimerManager的默认实现,使用了单个线程来处理过期的Runnable对象。因此,如果一个定时器操作需要做大量工作,尤其是如果它需要阻塞I/O,则应当在一个单独的线程中完成。
Nonblocking Operation(非阻塞操作):尽管Thrift传输接口更直接地映射到一个阻塞I/O模型,然而Thrift基于libevent和TFramedTransport,用C++实现了一个高性能的TNonBlockingServer。这是通过使用状态机,把所有I/O移动到一个严密的事件循环中来实现的。实质上,事件循环将成帧的请求读入TMemoryBuffer对象。一旦全部请求ready,它们会被分发给TProcessor对象,该对象能直接读取内存中的数据。
Compiler(编译器):Thrift编译器是使用C++实现的。尽管若用另一种语言来实现,代码行数可能会少,但使用C++能够强制语言结构的显示定义,使代码对新的开发者来说更容易接近。 代码生成使用两遍pass完成。第一遍只看include文件和类型定义。这一阶段,并不检查类型定义,因为它们可能依赖于include文件。第一次pass,所有包含的文件按顺序被扫描一遍。一旦解析了include树,第二遍pass过所有文件,将类型定义插入语法树,如果有任何未定义的类型,则引发一个error。然后,根据语法树生成程序。 由于固有的复杂性以及潜在的循环依赖性,Thrift显式地禁止前向声明。两个Thrift结构不能各自包含对方的一个实例。
TFileTransport:TFileTransport通过将来的数据及数据长度成帧,并将它写到磁盘上,来对Thrift的请求/结构作日志。使用一个成帧的磁盘上格式,允许了更好的错误检查,并有助于处理有限数目的离散事件。TFileWriterTransport使用一个交换内存中缓冲区的系统,来确保作大量数据的日志时的高性能。一个Thrift日志文件被分裂成某一特定大小的块,被记入日志的信息不允许跨越块的边界。如果有一个可能跨越块边界的消息,则添加填塞直到块的结束,并且消息的第一个字节与下一个块的开始对齐。将文件划分成块,使从文件的一个特定点读取及解释数据成为可能。
13. 示例
a. 生成thrift代码
a.1. 定义thrift文件
/*userinfo.thrift*/
namespace java com.maociyuan.cnblogs.home.userinfo
/*结构体类型:*/
struct UserInfo
{
1:i32 userid,
2:string username,
3:string userpwd,
4:string sex,
5:string age,
}
/*服务类型:*/
service UserInfoService{
UserInfo lg_userinfo_getUserInfoById(1:i32 userid),
string lg_userinfo_getUserNameById(1:i32 userid),
i32 lg_userinfo_getUserCount(),
bool lg_userinfo_checkUserById(1:i32 userid),
}
a.2. 编译thrift文件
在命令窗口,进入thrift文件目录执行如下命令
thrift-0.8.0.exe -r -gen java ./userinfo.thrift
生成两个java文件(在IDE中若安装了maven插件则可以使用命令:mvn genthrift:run)
UserInfo.java
UserInfoService.java
导入工程中:
b. 实现接口Iface
package com.maociyuan.cnblogs.home.userinfo;
import java.util.HashMap;
import java.util.Map;
import org.apache.thrift.TException;
public class UserInfoServiceImpl implements UserInfoService.Iface {
private static Map<Integer, UserInfo> userMap = new HashMap<Integer, UserInfo>();
static {
userMap.put(1, new UserInfo(1,"mao","mao","男","2016"));
userMap.put(2, new UserInfo(2,"ci","ci","女","07"));
userMap.put(3, new UserInfo(3,"yuan","yuan","男","28"));
}
public UserInfo lg_userinfo_getUserInfoById(int userid) throws TException {
// TODO Auto-generated method stub
return userMap.get(userid);
}
public String lg_userinfo_getUserNameById(int userid) throws TException {
// TODO Auto-generated method stub
return userMap.get(userid).getUsername();
}
public int lg_userinfo_getUserCount() throws TException {
// TODO Auto-generated method stub
return userMap.size();
}
public boolean lg_userinfo_checkUserById(int userid) throws TException {
// TODO Auto-generated method stub
return userMap.containsKey(userid);
}
}
c. 服务端代码
package com.maociyuan.cnblogs.home.service;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
import com.maociyuan.cnblogs.home.userinfo.UserInfoService;
import com.maociyuan.cnblogs.home.userinfo.UserInfoServiceImpl;
public class UserInfoServiceDemo {
public static final int SERVER_PORT = 8090;
public static final int SERVER_PORT1 = 8091;
public static final int SERVER_PORT2 = 8092;
public static final int SERVER_PORT3 = 8093;
// 简单的单线程服务模型,一般用于测试
public void startSimleServer() {
try {
System.out.println("UserInfoServiceDemo TSimpleServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
TServer.Args tArgs = new TServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TSimpleServer(tArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
//线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。
public void startTThreadPoolServer() {
try {
System.out.println("UserInfoServiceDemo TThreadPoolServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TServerSocket serverTransport = new TServerSocket(SERVER_PORT1);
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(tprocessor);
ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());
// 线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。
TServer server = new TThreadPoolServer(ttpsArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
// 线程池服务模型,使用标准的阻塞式IO,使用非阻塞式IO
public void startTNonblockingServer() {
try {
System.out.println("UserInfoServiceDemo TNonblockingServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT2);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
// 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TNonblockingServer(tnbArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
//半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。
public void startTHsHaServer() {
try {
System.out.println("HelloWorld THsHaServer start ....");
TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT3);
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
server.serve();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}
}
public static void main(String[] args) {
UserInfoServiceDemo server = new UserInfoServiceDemo();
//server.startSimleServer();
//server.startTThreadPoolServer();
//server.startTNonblockingServer();
//server.startTHsHaServer();
}
}
d. 客户端代码
UserInfoClientDemo.java
package com.maociyuan.cnblogs.home.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.maociyuan.cnblogs.home.userinfo.UserInfoService;
import com.maociyuan.cnblogs.home.userinfo.UserInfoService.AsyncClient.lg_userinfo_getUserNameById_call;
public class UserInfoClientDemo {
public static final String SERVER_IP = "localhost";
public static final int SERVER_PORT = 8090;
public static final int SERVER_PORT1 = 8091;
public static final int SERVER_PORT2 = 8092;
public static final int SERVER_PORT3 = 8093;
public static final int TIMEOUT = 30000;
public void startClient(int userid) {
TTransport transport = null;
try {
transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
// TProtocol protocol = new TCompactProtocol(transport);
// TProtocol protocol = new TJSONProtocol(transport);
UserInfoService.Client client = new UserInfoService.Client(protocol);
transport.open();
String result = client.lg_userinfo_getUserNameById(userid);
System.out.println("Thrify client result =: " + result);
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException e) {
//处理服务端返回值为null问题
if (e instanceof TApplicationException
&& ((TApplicationException) e).getType() ==
TApplicationException.MISSING_RESULT) {
System.out.println("The result of lg_userinfo_getUserNameById function is NULL");
}
} finally {
if (null != transport) {
transport.close();
}
}
}
public void startClientAsync(int userid,int port) {
TNonblockingTransport transport = null;
try {
TAsyncClientManager clientManager = new TAsyncClientManager();
transport = new TNonblockingSocket(SERVER_IP,port, TIMEOUT);
TProtocolFactory tprotocol = new TCompactProtocol.Factory();
UserInfoService.AsyncClient asyncClient = new UserInfoService.AsyncClient(
tprotocol, clientManager, transport);
System.out.println("Client start .....");
CountDownLatch latch = new CountDownLatch(1);
AsynCallback callBack = new AsynCallback(latch);
System.out.println("call method sayHello start ...");
asyncClient.lg_userinfo_getUserNameById(userid, callBack);
System.out.println("call method sayHello .... end");
boolean wait = latch.await(30, TimeUnit.SECONDS);
System.out.println("latch.await =:" + wait);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("startClient end.");
}
public class AsynCallback implements AsyncMethodCallback<lg_userinfo_getUserNameById_call>{
private CountDownLatch latch;
public AsynCallback(CountDownLatch latch) {
this.latch = latch;
}
public void onComplete(lg_userinfo_getUserNameById_call response) {
System.out.println("onComplete");
try {
Thread.sleep(1000L * 1);
System.out.println("AsynCall result =:" + response.getResult().toString());
} catch (TException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
public void onError(Exception exception) {
System.out.println("onError :" + exception.getMessage());
latch.countDown();
}
}
public static void main(String[] args) {
UserInfoClientDemo client = new UserInfoClientDemo();
client.startClient(1);
client.startClient(2);
client.startClient(3);
client.startClientAsync(1,SERVER_PORT2);
client.startClientAsync(2,SERVER_PORT3);
client.startClientAsync(3,SERVER_PORT2);
}
}
14. 服务端和客户端具体的调用流程
图.服务端启动、服务时序图:
程序调用了 TThreadPoolServer 的 serve 方法后,server 进入阻塞监听状态,其阻塞在 TServerSocket 的 accept 方法上。当接收到来自客户端的消息后,服务器发起一个新线程处理这个消息请求,原线程再次进入阻塞状态。在新线程中,服务器通过 TBinaryProtocol 协议读取消息内容,调用 HelloServiceImpl 的 helloVoid 方法,并将结果写入 helloVoid_result 中传回客户端。
图.客户端调用服务时序图:
该图所示是 HelloServiceClient 调用服务的过程以及接收到服务器端的返回值后处理结果的过程。从图中我们可以看到,程序调用了 Hello.Client 的 helloVoid 方法,在 helloVoid 方法中,通过 send_helloVoid 方法发送对服务的调用请求,通过 recv_helloVoid 方法接收服务处理请求后返回的结果。
参考:
https://blog.csdn.net/kesonyk/article/details/50924489
https://www.cnblogs.com/cyfonly/p/6059374.html
https://blog.csdn.net/sunmenggmail/article/details/46818147
转载于:https://my.oschina.net/liyurong/blog/1844975