RPC 功能目标
RPC 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用,在前文《浅出篇》中给出了一种实现结构,基于 stub 的结构来实现。下面我们将具体细化 stub 结构的实现。
RPC 调用分类
RPC 调用分以下两种:
- 1. 同步调用
- 客户方等待调用执行完成并返回结果。
- 2. 异步调用
- 客户方调用后不用等待执行结果返回,但依然可以通过回调通知等方式获取返回结果。
- 若客户方不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。
异步和同步的区分在于是否等待服务端执行完成并返回结果。
RPC 结构拆解
《浅出篇》给出了一个比较粗粒度的 RPC 实现概念结构,这里我们进一步细化它应该由哪些组件构成,如下图所示。
RPC 服务方通过 RpcServer
去导出(export)远程接口方法,而客户方通过 RpcClient
去引入(import)远程接口方法。客户方像调用本地方法一样去调用远程接口方法,RPC 框架提供接口的代理实现,实际的调用将委托给代理RpcProxy
。代理封装调用信息并将调用转交给RpcInvoker
去实际执行。在客户端的RpcInvoker
通过连接器RpcConnector
去维持与服务端的通道RpcChannel
,并使用RpcProtocol
执行协议编码(encode)并将编码后的请求消息通过通道发送给服务方。
RPC 服务端接收器 RpcAcceptor
接收客户端的调用请求,同样使用RpcProtocol
执行协议解码(decode)。解码后的调用信息传递给RpcProcessor
去控制处理调用过程,最后再委托调用给RpcInvoker
去实际执行并返回调用结果。
RPC 组件职责
上面我们进一步拆解了 RPC 实现结构的各个组件组成部分,下面我们详细说明下每个组件的职责划分。
- 1. RpcServer
- 负责导出(export)远程接口
- 2. RpcClient
- 负责导入(import)远程接口的代理实现
- 3. RpcProxy
- 远程接口的代理实现
- 4. RpcInvoker
- 客户方实现:负责编码调用信息和发送调用请求到服务方并等待调用结果返回
- 服务方实现:负责调用服务端接口的具体实现并返回调用结果
- 5. RpcProtocol
- 负责协议编/解码
- 6. RpcConnector
- 负责维持客户方和服务方的连接通道和发送数据到服务方
- 7. RpcAcceptor
- 负责接收客户方请求并返回请求结果
- 8. RpcProcessor
- 负责在服务方控制调用过程,包括管理调用线程池、超时时间等
- 9. RpcChannel
- 数据传输通道
RPC 实现分析
在进一步拆解了组件并划分了职责之后,这里以在 java 平台实现该 RPC 框架概念模型为例,详细分析下实现中需要考虑的因素。
导出远程接口
导出远程接口的意思是指只有导出的接口可以供远程调用,而未导出的接口则不能。在 java 中导出接口的代码片段可能如下:
- DemoService demo = new ...;
- RpcServer server = new ...;
- server.export(DemoService.class, demo, options);
我们可以导出整个接口,也可以更细粒度一点只导出接口中的某些方法,如:
- // 只导出 DemoService 中签名为 hi(String s) 的方法
- server.export(DemoService.class, demo, "hi", new Class<?>[] { String.class }, options);
java 中还有一种比较特殊的调用就是多态,也就是一个接口可能有多个实现,那么远程调用时到底调用哪个?这个本地调用的语义是通过 jvm 提供的引用多态性隐式实现的,那么对于 RPC 来说跨进程的调用就没法隐式实现了。如果前面DemoService
接口有 2 个实现,那么在导出接口时就需要特殊标记不同的实现,如:
- DemoService demo = new ...;
- DemoService demo2 = new ...;
- RpcServer server = new ...;
- server.export(DemoService.class, demo, options);
- server.export("demo2", DemoService.class, demo2, options);
上面 demo2 是另一个实现,我们标记为 "demo2" 来导出,那么远程调用时也需要传递该标记才能调用到正确的实现类,这样就解决了多态调用的语义。
导入远程接口与客户端代理
导入相对于导出远程接口,客户端代码为了能够发起调用必须要获得远程接口的方法或过程定义。目前,大部分跨语言平台 RPC 框架采用根据 IDL 定义通过 code generator 去生成 stub 代码,这种方式下实际导入的过程就是通过代码生成器在编译期完成的。我所使用过的一些跨语言平台 RPC 框架如 CORBAR、WebService、ICE、Thrift 均是此类方式。
代码生成的方式对跨语言平台 RPC 框架而言是必然的选择,而对于同一语言平台的 RPC 则可以通过共享接口定义来实现。在 java 中导入接口的代码片段可能如下:
- RpcClient client = new ...;
- DemoService demo = client.refer(DemoService.class);
- demo.hi("how are you?");
在 java 中 'import' 是关键字,所以代码片段中我们用 refer 来表达导入接口的意思。这里的导入方式本质也是一种代码生成技术,只不过是在运行时生成,比静态编译期的代码生成看起来更简洁些。java 里至少提供了两种技术来提供动态代码生成,一种是 jdk 动态代理,另外一种是字节码生成。动态代理相比字节码生成使用起来更方便,但动态代理方式在性能上是要逊色于直接的字节码生成的,而字节码生成在代码可读性上要差很多。两者权衡起来,个人认为牺牲一些性能来获得代码可读性和可维护性显得更重要。
协议编解码
客户端代理在发起调用前需要对调用信息进行编码,这就要考虑需要编码些什么信息并以什么格式传输到服务端才能让服务端完成调用。出于效率考虑,编码的信息越少越好(传输数据少),编码的规则越简单越好(执行效率高)。我们先看下需要编码些什么信息:
- -- 调用编码 --
- 1. 接口方法
- 包括接口名、方法名
- 2. 方法参数
- 包括参数类型、参数值
- 3. 调用属性
- 包括调用属性信息,例如调用附件隐式参数、调用超时时间等
- -- 返回编码 --
- 1. 返回结果
- 接口方法中定义的返回值
- 2. 返回码
- 异常返回码
- 3. 返回异常信息
- 调用异常信息
除了以上这些必须的调用信息,我们可能还需要一些元信息以方便程序编解码以及未来可能的扩展。这样我们的编码消息里面就分成了两部分,一部分是元信息、另一部分是调用的必要信息。如果设计一种 RPC 协议消息的话,元信息我们把它放在协议消息头中,而必要信息放在协议消息体中。下面给出一种概念上的 RPC 协议消息设计格式:
- -- 消息头 --
- magic : 协议魔数,为解码设计
- header size: 协议头长度,为扩展设计
- version : 协议版本,为兼容设计
- st : 消息体序列化类型
- hb : 心跳消息标记,为长连接传输层心跳设计
- ow : 单向消息标记,
- rp : 响应消息标记,不置位默认是请求消息
- status code: 响应消息状态码
- reserved : 为字节对齐保留
- message id : 消息 id
- body size : 消息体长度
- -- 消息体 --
- 采用序列化编码,常见有以下格式
- xml : 如 webservie soap
- json : 如 JSON-RPC
- binary: 如 thrift; hession; kryo 等
格式确定后编解码就简单了,由于头长度一定所以我们比较关心的就是消息体的序列化方式。序列化我们关心三个方面:
1. 序列化和反序列化的效率,越快越好。
2. 序列化后的字节长度,越小越好。
3. 序列化和反序列化的兼容性,接口参数对象若增加了字段,是否兼容。
上面这三点有时是鱼与熊掌不可兼得,这里面涉及到具体的序列化库实现细节,就不在本文进一步展开分析了。
传输服务
协议编码之后,自然就是需要将编码后的 RPC 请求消息传输到服务方,服务方执行后返回结果消息或确认消息给客户方。RPC 的应用场景实质是一种可靠的请求应答消息流,和 HTTP 类似。因此选择长连接方式的 TCP 协议会更高效,与 HTTP 不同的是在协议层面我们定义了每个消息的唯一 id,因此可以更容易的复用连接。
既然使用长连接,那么第一个问题是到底 client 和 server 之间需要多少根连接?实际上单连接和多连接在使用上没有区别,对于数据传输量较小的应用类型,单连接基本足够。单连接和多连接最大的区别在于,每根连接都有自己私有的发送和接收缓冲区,因此大数据量传输时分散在不同的连接缓冲区会得到更好的吞吐效率。所以,如果你的数据传输量不足以让单连接的缓冲区一直处于饱和状态的话,那么使用多连接并不会产生任何明显的提升,反而会增加连接管理的开销。
连接是由 client 端发起建立并维持。如果 client 和 server 之间是直连的,那么连接一般不会中断(当然物理链路故障除外)。如果 client 和 server 连接经过一些负载中转设备,有可能连接一段时间不活跃时会被这些中间设备中断。为了保持连接有必要定时为每个连接发送心跳数据以维持连接不中断。心跳消息是 RPC 框架库使用的内部消息,在前文协议头结构中也有一个专门的心跳位,就是用来标记心跳消息的,它对业务应用透明。
执行调用
client stub 所做的事情仅仅是编码消息并传输给服务方,而真正调用过程发生在服务方。server stub 从前文的结构拆解中我们细分了 RpcProcessor
和RpcInvoker
两个组件,一个负责控制调用过程,一个负责真正调用。这里我们还是以 java 中实现这两个组件为例来分析下它们到底需要做什么?
java 中实现代码的动态接口调用目前一般通过反射调用。除了原生的 jdk 自带的反射,一些第三方库也提供了性能更优的反射调用,因此 RpcInvoker
就是封装了反射调用的实现细节。
调用过程的控制需要考虑哪些因素,RpcProcessor
需要提供什么样地调用控制服务呢?下面提出几点以启发思考:
- 1. 效率提升
- 每个请求应该尽快被执行,因此我们不能每请求来再创建线程去执行,需要提供线程池服务。
- 2. 资源隔离
- 当我们导出多个远程接口时,如何避免单一接口调用占据所有线程资源,而引发其他接口执行阻塞。
- 3. 超时控制
- 当某个接口执行缓慢,而 client 端已经超时放弃等待后,server 端的线程继续执行此时显得毫无意义。
RPC 异常处理
无论 RPC 怎样努力把远程调用伪装的像本地调用,但它们依然有很大的不同点,而且有一些异常情况是在本地调用时绝对不会碰到的。在说异常处理之前,我们先比较下本地调用和 RPC 调用的一些差异:
1. 本地调用一定会执行,而远程调用则不一定,调用消息可能因为网络原因并未发送到服务方。
2. 本地调用只会抛出接口声明的异常,而远程调用还会跑出 RPC 框架运行时的其他异常。
3. 本地调用和远程调用的性能可能差距很大,这取决于 RPC 固有消耗所占的比重。
正是这些区别决定了使用 RPC 时需要更多考量。当调用远程接口抛出异常时,异常可能是一个业务异常,也可能是 RPC 框架抛出的运行时异常(如:网络中断等)。业务异常表明服务方已经执行了调用,可能因为某些原因导致未能正常执行,而 RPC 运行时异常则有可能服务方根本没有执行,对调用方而言的异常处理策略自然需要区分。
由于 RPC 固有的消耗相对本地调用高出几个数量级,本地调用的固有消耗是纳秒级,而 RPC 的固有消耗是在毫秒级。那么对于过于轻量的计算任务就并不合适导出远程接口由独立的进程提供服务,只有花在计算任务上时间远远高于 RPC 的固有消耗才值得导出为远程接口提供服务。
总结
至此我们提出了一个 RPC 实现的概念框架,并详细分析了需要考虑的一些实现细节。无论 RPC 的概念是如何优雅,但是“草丛中依然有几条蛇隐藏着”,只有深刻理解了 RPC 的本质,才能更好地应用。
前言:
thrift是出于Facebook的rpc网络编程框架, 其对跨平台和多语言的支持优于google protobuf, 但thrift在java/c#语言上应用比较多, 资料也丰富, 在windows平台的c++这块, 资料相对较少, 而且编译也麻烦. 这篇博客主要记录对thrift在windows上的编译和使用过程, 不涉及原理, 也不具体涉及应用.如有不足, 请各位指正.
执行过程
1. 下载并安装Visual Studio
notice: visual studio 有windows版本限制, 比如visual studio 2013在windows 7就安装不了
参考网址: http://www.visualstudio.com/zh-cn/visual-studio-2013-compatibility-vs
系统: windows7 + visual studio 2012
2. boost安装/编译/链接
具体步骤如下:
*) 下载boost
1. 下载 boost_1_55_0.zip
*) 编译boost
1. 执行 bootstrap.bat
2. 执行 b2.exe (编译的时间较长, 请耐心等待)
*) 验证boost
在virtual studio的window console工程属性中添加如下:
1. 附加包含目录: $BOOST_HOME
2. 附加库目录: $BOOST_HOME\stage\lib
3. 编写如下代码进行编译/运行认证
1 #include <iostream>
2 #include <string>
3 #include <boost/regex.hpp>
4 int main()
5 {
6 boost::regex pattern("\\w+@\\w+(\\.\\w+)*");
7 std::string mail("xxx@gmail.com");
8
9 if ( boost::regex_match(mail, pattern) ) {
10 std::cout << mail << " is a valid mail address!" << std::endl;
11 } else {
12 std::cout << mail << " is not a valid mail address!" << std::endl;
13 }
14 }
安装boost和配置visual studio的参考网址如下所示:
http://blog.****.net/stanfordzhang/article/details/8587282
http://www.cnblogs.com/me115/archive/2010/10/08/1845825.html
http://www.cnblogs.com/chuncn/archive/2012/09/10/2679026.html
3. libevent的编译/安装/链接
*) 参考的编译/安装过程网页
http://blog.s135.com/libevent_windows/
*) 下载libevent
http://libevent.org/
*) 编译libevent
遇到的编译错误处理方案
http://10305101ivy.blog.163.com/blog/static/584765892012227322607/
http://blog.****.net/boyxiaolong/article/details/17057063
evutil.c添加如下行:
1 #ifdef WIN32
2 #include <winsock2.h>
3 #include <ws2tcpip.h>
4 #pragma comment(lib,"ws2_32.lib")
5 #define WIN32_LEAN_AND_MEAN
6 #include <windows.h>
7 #undef WIN32_LEAN_AND_MEAN
8 #include <io.h>
9 #include <tchar.h>
10 #endif
nmake /f Makefile.nmake
生成libevent_core.lib libevent_extras.lib libevent.lib
若遇到头文件找不到的问题, 需要手动修改Makefile.nmake文件, 添加相关的头文件路径
CFLAGS=/IWIN32-Code /Iinclude /Icompat /DWIN32 /DHAVE_CONFIG_H /I. /I"C:\Program Files (x86)\Windows Kits\8.0\Include\um" /I"C:\Program Files (x86)\Windows Kits\8.0\Include\shared" /I"C:\Program Files (x86)\Microsoft Visual Studio 11.0\VC\include"
|
1 #include <event.h>
2 #include <stdio.h>
3
4 int main()
5 {
6 const char *version = event_get_version();
7 printf("%s\n",version);
8 return 0;
9 }
4. thrift的编译/链接
*)下载thrift 0.9.0源码
下载网址: http://archive.apache.org/dist/thrift
*)thrift依赖的库
http://www.coder4.com/archives/3777
thrift 依赖 boost库(1.4.7), thriftnb 依赖 boost/libevent库
http://www.iteye.com/problems/87958
thrift在编译过程中, 会遇到二义性
“_wassert”: 对重载函数的调用不明确
void _wassert(const wchar_t *,const wchar_t *,unsigned int)
void apache::thrift::protocol::_wassert(const wchar_t *,const wchar_t *,unsigned int)
解决方案:
这算命令空间污染的问题, 添加::, 使得对_wassert的调用采用全局声明的那个函数
1 assert.h
2
3 #define assert(_Expression) (void)( (!!(_Expression)) || (_wassert(_CRT_WIDE(#_Expression), _CRT_WIDE(__FILE__), __LINE__), 0) )
4
5 #define assert(_Expression) (void)( (!!(_Expression)) || (::_wassert(_CRT_WIDE(#_Expression), _CRT_WIDE(__FILE__), __LINE__), 0) )
6
1 namespace cpp test
2
3 service HelloService {
4 string hello(1: string username);
5 }
1 2014/05/07 11:35 9,215 HelloService.cpp
2 2014/05/07 11:26 7,117 HelloService.h
3 2014/05/07 11:26 1,422 HelloService_server.skeleton.cpp
4 2014/05/07 11:35 268 hello_constants.cpp
5 2014/05/07 11:35 357 hello_constants.h
6 2014/05/07 11:35 202 hello_types.cpp
7 2014/05/07 11:35 367 hello_types.h
1 class HelloServiceHandler : virtual public HelloServiceIf {
2 public:
3 HelloServiceHandler() {
4 // Your initialization goes here
5 }
6
7 void hello(std::string& _return, const std::string& username) {
8 _return = "hello " + username;
9 }
10
11 };
12
13 int main(int argc, char **argv) {
14 int port = 9090;
15
16 TWinsockSingleton::create(); // 需要用户自己添加, 进行WSAStartup的初始化, 算是windows 版的thrift的一个疏忽
17
18 shared_ptr<HelloServiceHandler> handler(new HelloServiceHandler());
19 shared_ptr<TProcessor> processor(new HelloServiceProcessor(handler));
20 shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
21 shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
22 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
23
24 TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
25 server.serve();
26 return 0;
27 }
1
2 #include "HelloService.h"
3 #include <thrift/protocol/TBinaryProtocol.h>
4 #include <thrift/server/TSimpleServer.h>
5 #include <thrift/transport/TBufferTransports.h>
6 #include <thrift/transport/TSocket.h>
7
8 using namespace ::apache::thrift;
9 using namespace ::apache::thrift::protocol;
10 using namespace ::apache::thrift::transport;
11 using namespace ::apache::thrift::server;
12
13 using boost::shared_ptr;
14
15 using namespace ::test;
16
17 int main()
18 {
19
20 shared_ptr<TTransport> socket(new TSocket("127.0.0.1", 9090));
21 shared_ptr<TTransport> transport(new TBufferedTransport(socket));
22 shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
23
24 HelloServiceClient client(protocol);
25
26 try {
27 transport->open();
28 std::string res;
29 client.hello(res, "lilei");
30 std::cout << res << std::endl;
31 } catch(TException &e) {
32 std::cout << e.what() << std::endl;
33 }
34
35 return 0;
36 }
37