Thrift - 快速入门

时间:2023-03-10 05:17:45
Thrift - 快速入门

Getting Started

如果有homebrew的话,直接执行以下命令即可,brew会处理相关依赖(https://thrift.apache.org/docs/install/)。

brew install thrift

或者可以从源码安装。
下载tar包 https://thrift.apache.org/download
参考 https://thrift.apache.org/docs/BuildingFromSource

先写一个例子,目录结构如下:

├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   └── resources
│   └── test
│   └── java
└── thrift
├── Common.thrift
└── ShopService.thrift

pom.xml中添加以下依赖:

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>

thrift目录下创建两个thrift文件:

Common.thrift

namespace java me.kavlez.thrift.service

service BaseService {
string echoServiceName()
}

ShopService.thrift

include "Common.thrift"

namespace java me.kavlez.thrift.service

struct Shop {
1: required i32 id,
2: required string name
} struct Item {
1: required i32 id,
2: required string name = "unknown",
3: required string detail,
4: required Shop shop
} service ShopService extends Common.BaseService { Shop queryShopInfo(1: i32 id),
bool isValidShop(1: Shop shop),
set<Item> queryItems(1: i32 shopId),
}

Thrift提供了多个语言的生成器实现,按照thrift文件生成java类,生成代码命令的用法如下:

thrift -r --gen <language> <Thrift filename>

其中-r即recursive,如果在文件中通过include关键字引用了其他文件,-r选项可以一并生成被引用的文件。

例如上面ShopService.thrift中的:

include Common.thrift

默认情况下,代码会在gen-<language>目录下生成,生成目录可以通过--out指定。

生成后再拷贝有点麻烦,直接生成到代码目录下,在工程目录下执行以下命令:

thrift -r --gen java --out src/main/java thrift/ShopService.thrift

执行后src/main/java/目录下生成me/kavlez/thrift/service/目录,以及4个java文件。

在service目录下创建impl,提供接口实现:

package me.kavlez.thrift.service.impl;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.Shop;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException; import java.util.Collections;
import java.util.HashSet;
import java.util.Set; /**
* Created by Kavlez.Kim@gmail.com
*/
@Slf4j
public class ShopServiceImpl implements ShopService.Iface {
@Override
public Shop queryShopInfo(int id) throws TException {
return new Shop(id, "DMC_".concat(String.valueOf(id)));
} @Override
public boolean isValidShop(Shop shop) throws TException {
return shop != null;
} @Override
public Set<Item> queryItems(int shopId) throws TException { if (shopId < 1) {
return Collections.emptySet();
} Set<Item> items = new HashSet<>();
Shop shop = new Shop(1101, "DMC");
for (int i = 0; i < 8; i++) {
Item item = new Item(shopId + i, "sample_".concat(String.valueOf(shopId + i))
, "this is sample_".concat(String.valueOf(i))
, shop);
items.add(item);
}
return items;
} @Override
public String echoServiceName() throws TException {
return "alo! this is shop service!";
}
}

除了业务实现,我们需要额外做两件事情——构建Server和Client。

构建Server,也就是为Server指定Transparent、Protocol、Processor:

package me.kavlez.thrift.server;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.ShopService;
import me.kavlez.thrift.service.impl.ShopServiceImpl;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException; /**
* Created by Kavlez.Kim@gmail.com
*/
@Slf4j
public class SimpleServerHolder { public static TServer buildServer() {
TServerSocket serverSocket = null;
try {
serverSocket = new TServerSocket(8081);
} catch (TTransportException e) {
e.printStackTrace();
}
TProcessor tprocessor = new ShopService.Processor<ShopService.Iface>(new ShopServiceImpl()); TServer.Args tArgs = new TServer.Args(serverSocket);
tArgs.protocolFactory(new TCompactProtocol.Factory());
tArgs.processor(tprocessor); TServer server = new TSimpleServer(tArgs);
return server;
} public static void main(String[] args) {
TServer server = SimpleServerHolder.buildServer();
log.info("server ready...");
server.serve();
}
}

相应地,构建Client:

package me.kavlez.thrift.client;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport; import java.util.Set; /**
* Created by Kavlez.Kim@gmail.com
*/
@Slf4j
public class SimpleClientHolder { private TTransport transport; public ShopService.Client buildClient(String serverAddr, int serverPort, int timeout) throws TException { this.transport = new TSocket(serverAddr, serverPort, timeout);
TProtocol protocol = new TCompactProtocol(transport);
transport.open(); ShopService.Client client = new ShopService.Client(protocol);
return client;
} public static void main(String[] args) {
SimpleClientHolder simpleClientHolder = new SimpleClientHolder();
ShopService.Client client = null;
try {
client = simpleClientHolder.buildClient("localhost", 8081, 1000);
Set<Item> items = client.queryItems(666);
log.info("return items = {}", String.valueOf(items));
} catch (TException e) {
e.printStackTrace();
} if (null != simpleClientHolder.transport) {
simpleClientHolder.transport.close();
}
}
}

依次运行Server和Client,输出正常。

IDL (Interface Description Language)

提供服务的第一步是用IDL编写Thrift文件,IDL几乎可以描述接口所需的所有元素,接口定义中包括以下内容:

namespace

每个thrift文件都在自己的命名空间中,多个thrift文件可以用同一个命名空间作为标识,并指定要使用的语言的generator。

例如:

namespace java me.kavlez.thrift.service
namespace php tutorial

基本类型

类型 说明
bool 布尔类型
i8 (byte) 8-bit 有符号整型,对应java的byte
i16 16-bit 有符号整型,对应java的short
i32 32-bit 有符号整型,对应java的int
i64 64-bit 有符号整型,对应java的long
double 64-bit 浮点类型,对应java的double
string 字符串
binary Blob (byte array)

结构体

用于定义一个对象类型。

字段默认为optional,可以声明required。
字段可以设置默认值。
结构体之间可以互相引用。
0.9.2开始可以引用自身。

struct Shop {
1: required i32 id,
2: required string name
} struct Item {
1: required i32 id,
2: required string name = "unknown",
3: required string detail,
4: required Shop shop
}

枚举

值是可选项,枚举不能嵌套;基本上就是K、V的形式,不能描述太复杂的枚举类。

enum Numberz {
ONE = 1,
TWO,
THREE,
FIVE = 5,
SIX,
EIGHT = 8
}

常量

可以自定义常量,像Map、List这样的复杂结构可以用json表示。

const i32 INT_CONST = 1234;    // a
const map<string,string> MAP_CONST = {"hello": "world", "goodnight": "moon"}
const list<string> LIST_CONST = ["a","b","c"]

容器类型

不支持异构容器,容器的元素类型必须一致。
元素类型可以是service以外的任何类型。

类型 说明
map<t1,t2> Map from one type to another
list<t1> Ordered list of one type
set<t1> Set of unique elements of one type

自定义异常

语法上和struct相似,生成后的代码,不同语言各有各的实现方式。

exception IllegalShopException {
1: i32 errorCode,
2: string message,
3: Shop shop
}

service

一个函数集合,语法和java定义接口的语法类似,下面是一些例子。

service ThriftTest {

  /**
* 无返回,空参数列表
*/
void testVoid(), /**
* 声明返回类型、参数
*/
string testString(1: string thing), /**
* 返回结构体
*/
Shop queryShopInfo(1: i32 id), /**
* 结构体作为参数
*/
bool isValidShop(1: Shop shop), /**
* ...
*/
set<Item> queryItems(1: i32 shopId), /**
* 抛出异常
*/
bool changeShopStatus(1: i32 shopId) throws(1: IllegalShopException err), /**
* 多异常
*/
bool changeItemStatus(1: i32 itemId) throws(1: IllegalShopException shopErr,2:IllegalItemException itemErr), /**
* oneway表示该方法在客户端发起请求后不会等待响应,返回类型必须为void
*/
oneway void sendMessage(1:i32 shopId,2:string message)
}

thrift working stack

用Thrift构建服务和客户端,架构如下:

+-------------------+       +-------------------+
| Server | | Client |
| | | |
| +---------------+ | | +---------------+ |
| | | | | | | |
| | your code | | | | your code | |
| +---------------+ | | +---------------+ |
| | Service | | | | Service | |
| | processor | | | | Client | |
| +---------------+ | | +---------------+ |
| | | | | | | |
| | Protocol | | | | Protocol | |
| +---------------+ | | +---------------+ |
| | | | | | | |
| | Transport |<--------->| Transport | |
| +---------------+ | | +---------------+ |
+-------------------+ +-------------------+

生成的接口类中大致包括三样,分别是Iface、Client、Processor。
另外还有Server、Transport、Protocol。

Transport

在RPC框架的语境下谈传输层很容易只想到网络通信,但Transport表述的并不只是网络通信。

不如说Transport是多种IO的抽象,其不仅限于网络IO。

比如,基础的TIOStreamTransport,以及其两个子类,TSocket和TZlibTransport。

TSocket在上面的例子中作为TBinaryProtocol依赖的transport类型,与Server的TServerSocket进行通信。

但后者是封装了InflaterInputStream和DeflaterOutputStream,其InputStream并不要求是SocketInputStream。

从开发角度来讲,如果将一个TMemoryBuffer对象传入Protocol,并以此创建某个service对应的Client,再调用相应接口。

整个过程在代码上并没有什么限制,只是运行时抛出org.apache.thrift.TApplicationException。

Protocol

protocol依赖transport,决定双方以什么协议通信,同时也是通信内容的载体。

org.apache.thrift.protocol.TProtocol中的方法声明里,一系列readXX和writeXX,在具体实现中通常都是通过transport来完成。

以TJSONProtocol为例,其实现的TProtocol的所有write方法都是以几个私有的write方法组织起来。

比如,writeI32和writeI64都是通过私有方法writeJSONInteger,而writeJSONInteger则是由实例化时传入的trasnport进行write。

Processor

构建自己的server时需要在tArgs提供一个Processor,比如本文中的ShopService.Processor。
(p.s. 如果需要提供多个Processor,比如再加一个ItemService,则使用TMultiplexedProcessor即可。)

Server通过Processor执行业务逻辑代码,文件中描述的每个函数作为ProcessFunction子类进行实例化,放入Processor的processMap中。

Server收到请求,从输入的protocol中读取方法名,根据方法名从processMap中拿到对应的ProcessFunction;
通过ProcessFunction的process方法执行业务逻辑,过程大体分为3步:

  • 从protocol读入请求参数,构建参数对象;
  • 传入参数,本地执行业务方法。假设方法名为"getItems",调用结果则为getItems_success;
  • 将结果写入protocol,调用protocol.writeXX;

Client

像本文中,指定Transport和Protocol,构建ShopService.Client,客户端通过Client对象像调用本地方法一样调用queryItems;
在ShopService中,Client类同样实现了ShopService.Iface中的方法,以queryItems为例,其实现如下:

public Shop queryShopInfo(int id) throws org.apache.thrift.TException {
send_queryShopInfo(id);
return recv_queryShopInfo();
}

在send_queryShopInfo,构建该函数对应的xx_args对象,将其写入oprot,并通过oprot.tranport进行flush;

相应地,recv_queryShopInfo就是从iport中读取函数的返回值,构建该函数对应的queryShopInfo_result对象。

Server

将Transport、Protocol和Processor集合在一起就是一个完整的Server,父类TServer提供了唯一的抽象方法——serve()。

以TSimpleServer为例,serve中通过java.lang.ServerSocket的accept获取client Socket并转为client Transport,以此获取相应的Processor、创建相应的inputTransport、outputTransport和iProt、oProt。

(p.s. 默认的TProcessorFactory没有子类,其getProcessor(Transport)和并没有通过transport来获取processor。可以用来扩展,比如用一个server提供多版本服务之类的。)

剩下的工作由Processor进行处理,从iPort读入请求信息并构造TMessage,找到相应的ProcessFunction并执行其process方法,这个在上面说过。

Thrift为TServer提供了3种实现:

  • TSimpleServer: 单线程ServerSocket实现,仅用于测试;

  • TThreadPoolServer: 封装了ThreadPoolExecutor,用内部类WorkerProcess表示单个请求,通过每个WorkerProcess对象的transport获取相应的Processor和Protocol,调用业务代码并返回;

  • AbstractNonblockingServer: 非阻塞server抽象类,其serve()方法即整个过程的skeleton,serve()中调用的方法交给其子类提供具体实现。

    public void serve() {
    // start any IO threads
    if (!startThreads()) {
    return;
    } // start listening, or exit
    if (!startListening()) {
    return;
    } setServing(true); // this will block while we serve
    waitForShutdown(); setServing(false); // do a little cleanup
    stopListening();
    }

AbstractNonblockingServer的3个子类,分别为:

  • TNonblockingServer: 实现父类的startThreads(),启动selector线程(也就是SelectAcceptThread,父类声明了protected final Selector selector),开始轮询SelectedKeys,检查状态并进行相应处理:

    if (key.isAcceptable()) {
    handleAccept();
    } else if (key.isReadable()) {
    handleRead(key);
    } else if (key.isWritable()) {
    handleWrite(key);
    } else {
    LOGGER.warn("Unexpected state in select! " + key.interestOps());
    }

    另外,使用TNonblockingServer时transport必须为TFramedTransport,以此保证能正确读取单次方法调用。

  • THsHaServer: "HsHa",即"Half-Sync/Half-Async",是TNonblockingServer的子类。

    工作流程和TNonblockingServer相似,主要区别在与handleRead()
    handleRead中完成读取后,另外一项重要的工作就是requestInvoke(buffer),也就是执行processor.process(iProt,oProt)。

    不过,TNonblockingServer是单线程执行,而THsHaServer则是通过线程池。
    将FrameBuffer装进Invocation(其run方法即frameBuffer.invoke()),提交给线程池处理。

    线程池参数的默认值如下:

    corePoolSize = 5;
    maximumPoolSize = Integer.MAX_VALUE;
    keepAliveTime = 60;
    workQueue = new LinkedBlockingQueue<Runnable>();
  • TThreadedSelectorServer: 进一步加强HsHaServer,用一个AcceptThread接收所有连接请求,并担任负载均衡的角色。

    负载均衡的工作由构造器参数中的SelectorThreadLoadBalancer进行,该类只提供了一种实现——对已注册的selector线程列表进行round robin。
    AcceptThread处理连接时,通过SelectorThreadLoadBalancer选出selector线程,将接收到的socketChannel放入selector线程的队列中。

    虽然TThreadedSelectorServer的requestInvoke也是使用线程池进行,但线程池的默认配置和THsHaServer不同,默认时为corePoolSize为5的FixedThreadPool。
    如果corePoolSize小为0,则由caller线程执行。

最后,把之前的例子修改一下,看看效果。

AbstractTServerHolder.java

package me.kavlez.thrift.server;

import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException; public abstract class AbstractTServerHolder {
private TServer tServer; public abstract TServer build() throws TTransportException;
}

ThreadedSelectorServerHolder.java

package me.kavlez.thrift.server;

import me.kavlez.thrift.service.ShopService;
import me.kavlez.thrift.service.impl.ShopServiceImpl;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportException; public class ThreadedSelectorServerHolder extends AbstractTServerHolder {
@Override
public TServer build() throws TTransportException {
TNonblockingServerTransport transport = new TNonblockingServerSocket(8090);
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport); ShopService.Processor<ShopService.Iface> shopServiceProcessor
= new ShopService.Processor<ShopService.Iface>(new ShopServiceImpl());
args.processor(shopServiceProcessor)
.protocolFactory(new TBinaryProtocol.Factory())
.transportFactory(new TFramedTransport.Factory()); TServer server = new TThreadedSelectorServer(args);
return server; }
}

Launcher.java

package me.kavlez.thrift;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.client.AbstractShopServiceClientHolder;
import me.kavlez.thrift.client.NonBlockingClientHolder;
import me.kavlez.thrift.client.ShopServiceClientAgent;
import me.kavlez.thrift.server.AbstractTServerHolder;
import me.kavlez.thrift.server.ThreadedSelectorServerHolder;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException; import java.io.FileNotFoundException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; @Slf4j
public class Launcher { static class TServerClientHolderPair {
private AbstractTServerHolder tServerHolder;
private Class<? extends AbstractShopServiceClientHolder> clientHolderClass; public TServerClientHolderPair(AbstractTServerHolder tServerHolder, Class<? extends AbstractShopServiceClientHolder> clientHolderClass) {
this.tServerHolder = tServerHolder;
this.clientHolderClass = clientHolderClass;
}
} public static void main(String[] args) throws InterruptedException, TTransportException, FileNotFoundException { final AbstractTServerHolder serverHolder = new ThreadedSelectorServerHolder();
final TServer tServer = serverHolder.build(); ExecutorService executorService = Executors.newCachedThreadPool();
Future<?> serverFuture = executorService.submit(new Runnable() {
@Override
public void run() {
tServer.serve();
}
}); Thread.sleep(100); int times = 10;
final CountDownLatch countDownLatch = new CountDownLatch(times); class ShopServiceClientTask implements Runnable { @Override
public void run() {
AbstractShopServiceClientHolder clientHolder = null;
clientHolder = new NonBlockingClientHolder(); try {
ShopService.Iface shopService = new ShopServiceClientAgent(clientHolder.build());
for (int i = 0; i < 1000; i++) {
Set<Item> items = shopService.queryItems(666);
log.info("return items = {}", String.valueOf(items));
} } catch (TException e) {
log.info("thread name={} get TException", Thread.currentThread().getName(), e);
} finally {
clientHolder.close();
countDownLatch.countDown();
}
}
} long start = System.currentTimeMillis(); for (int i = 0; i < times; i++) {
executorService.submit(new ShopServiceClientTask());
} countDownLatch.await();
log.info("used {} ms ", System.currentTimeMillis() - start);
tServer.setShouldStop(true);
tServer.stop();
executorService.shutdown(); }
}