Thrift compiler代码生成类解析

时间:2023-03-08 19:22:58

代码生成类解析:

Thrift--facebook RPC框架,介绍就不说了,百度,google一大把,使用也不介绍,直接上结构和分析吧。

Hello.thrift文件内容如下:

namespace java com.tomsun.thrift.generated.demo
service Hello {
string helloString(1:string para)
}

内容很简单,申明个RPC service(Hello),服务方法helloString,方法参数格式(seq: parameter type, parameter name),参数需要标号(1: xxx xxx, 2: xxx xxx), namespace 指定生成代码语言类型(java),和Java包名(本文只讨论java ^#^!).

类文件解析

话说就上面定义一个service(只包含一个method),可生成的类文件可是一个庞然大物(975行代码),在此就不全贴出来了,说明时会贴出关键代码用于说明。

因为thrift是一个完整的RPC框架,内部结构分的很细,所以代码生成量理所当然,(protobuf的所谓的RPC,只不过是个架子,空的,基本上都得自己去实现。当protobuf序列化自己感觉比thrift丰富多了(sint,fint,int,uint)),

但thrift支持的容器类型(List,set, map)比protobuf多(list),具体介绍等以后再详细,回到正题。

thrift compiler生成的Hello.java 文件,以服务名为类文件名。服务接口定义如下:

   public interface Iface { //同步RPC服务调用接口定义

     public String helloString(String para) throws org.apache.thrift.TException;

   }

   public interface AsyncIface { //异步RPC服务调用接口定义(仔细瞅,方法返回值参数string没了,方法参数多了个AsyncMethodCallback)

     public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;

   }

Async形式具体返回值是通过callback回调来获得。定义如下:

 public interface AsyncMethodCallback<T> {
/**
* This method will be called when the remote side has completed invoking
* your method call and the result is fully read. For oneway method calls,
* this method will be called as soon as we have completed writing out the
* request.
* @param response
*/
public void onComplete(T response); /**
* This method will be called when there is an unexpected clientside
* exception. This does not include application-defined exceptions that
* appear in the IDL, but rather things like IOExceptions.
* @param exception
*/
public void onError(Exception exception);
}

自己定义异步RPC得实现该接口,同步RPC客户端骨架及其工厂类如下。

   public static class Client extends org.apache.thrift.TServiceClient implements Iface {
public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {

Client,客户端调用RPC骨架实现,封装了底层复杂RPC序列化,和网络传输。

 public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {

感觉thrift的工厂有点鸡肋的感念,没有设计模式三种工厂那种封装的味道,可有可无。

具体看一下同步骨架内部:

   public String helloString(String para) throws org.apache.thrift.TException
{
send_helloString(para);
return recv_helloString();
} public void send_helloString(String para) throws org.apache.thrift.TException
{
helloString_args args = new helloString_args();
args.setPara(para);
sendBase("helloString", args);
} public String recv_helloString() throws org.apache.thrift.TException
{
helloString_result result = new helloString_result();
receiveBase(result, "helloString");
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "helloString failed: unknown result");
}

String helloString(String para)方法包含send,recv两同步操作,另外thrift为方法参数和返回值都生成一个java类,本例中的helloString_args,helloString_result.

public static class helloString_args implements org.apache.thrift.TBase<helloString_args, helloString_args._Fields>, java.io.Serializable, Cloneable, Comparable<helloString_args>

public static class helloString_result implements org.apache.thrift.TBase<helloString_result, helloString_result._Fields>, java.io.Serializable, Cloneable, Comparable<helloString_result> 

两者为rpc客户端和服务器端传输的数据,都实现TBase接口,

public interface TBase<T extends TBase<T,F>, F extends TFieldIdEnum> extends Comparable<T>,  Serializable {

  /**
* Reads the TObject from the given input protocol.
*
* @param iprot Input protocol
*/
public void read(TProtocol iprot) throws TException; /**
* Writes the objects out to the protocol
*
* @param oprot Output protocol
*/
public void write(TProtocol oprot) throws TException; /**
* Get the F instance that corresponds to fieldId.
*/
public F fieldForId(int fieldId); /**
* Check if a field is currently set or unset.
*
* @param field
*/
public boolean isSet(F field); /**
* Get a field's value by field variable. Primitive types will be wrapped in
* the appropriate "boxed" types.
*
* @param field
*/
public Object getFieldValue(F field); /**
* Set a field's value by field variable. Primitive types must be "boxed" in
* the appropriate object wrapper type.
*
* @param field
*/
public void setFieldValue(F field, Object value); public TBase<T, F> deepCopy(); /**
* Return to the state of having just been initialized, as though you had just
* called the default constructor.
*/
public void clear();
}

TBase定义read,write和具体字段是否设值的判定方法,以helloString_args具体解析:

 private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("helloString_args");

 private static final org.apache.thrift.protocol.TField PARA_FIELD_DESC = new org.apache.thrift.protocol.TField("para", org.apache.thrift.protocol.TType.STRING, (short)1);

struct_desc(TStruct)为thrift内部对象结构,作为远程传输读写的参数metadata元数据和标志位(readStructBegin(), writeStructEnd())

/**
* Helper class that encapsulates struct metadata.
*
*/
public final class TStruct {
public TStruct() {
this("");
} public TStruct(String n) {
name = n;
} public final String name; //(本例中为hellosString_args)
}

para_field_desc(TField)为传输结构对象中的变量结构,read,write时会对应相应的标志位(readTFiledBegin(), writeTFieldEnd()),具体为方法参数和返回值,本例中为helloString方法参数(如果多个方法参数,helloString_args中将对应多个TField元数据):

/**
* Helper class that encapsulates field metadata.
*
*/
public class TField {
public TField() {
this("", TType.STOP, (short)0);
} public TField(String n, byte t, short i) {
name = n; //参数名
type = t;//thrift内部类型
id = i;// seq number,即为hello.thrift方法参数定义的num.
} public final String name;
public final byte type;
public final short id; public String toString() {
return "<TField name:'" + name + "' type:" + type + " field-id:" + id + ">";
} @Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + id;
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + type;
return result;
} @Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TField otherField = (TField) obj;
return type == otherField.type && id == otherField.id;
}
}

thrift内部类型:

/**
* Type constants in the Thrift protocol.
*/
public final class TType {
public static final byte STOP = 0; //参数值没有设定,指定stop,read wire stream时会直接skip。
public static final byte VOID = 1;
public static final byte BOOL = 2;
public static final byte BYTE = 3;
public static final byte DOUBLE = 4;
public static final byte I16 = 6;
public static final byte I32 = 8;
public static final byte I64 = 10;
public static final byte STRING = 11;
public static final byte STRUCT = 12;
public static final byte MAP = 13;
public static final byte SET = 14;
public static final byte LIST = 15;
public static final byte ENUM = 16;
}
 static {
schemes.put(StandardScheme.class, new helloString_argsStandardSchemeFactory());
schemes.put(TupleScheme.class, new helloString_argsTupleSchemeFactory());
}

注册具体stream read,write的类和对应的工厂类,两种读写方法:

 public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
} public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
 private static class helloString_argsStandardScheme extends StandardScheme<helloString_args> {

      public void read(org.apache.thrift.protocol.TProtocol iprot, helloString_args struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // PARA
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.para = iprot.readString();
struct.setParaIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd(); // check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
} public void write(org.apache.thrift.protocol.TProtocol oprot, helloString_args struct) throws org.apache.thrift.TException {
struct.validate(); oprot.writeStructBegin(STRUCT_DESC);
if (struct.para != null) {
oprot.writeFieldBegin(PARA_FIELD_DESC);
oprot.writeString(struct.para);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
} }
 private static class helloString_argsTupleScheme extends TupleScheme<helloString_args> {

      @Override
public void write(org.apache.thrift.protocol.TProtocol prot, helloString_args struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
if (struct.isSetPara()) {
optionals.set(0);
}
oprot.writeBitSet(optionals, 1);
if (struct.isSetPara()) {
oprot.writeString(struct.para);
}
} @Override
public void read(org.apache.thrift.protocol.TProtocol prot, helloString_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
struct.para = iprot.readString();
struct.setParaIsSet(true);
}
}
} }

standard schema读写更倾向于结构化,(structBeging->fieldBegin()->value()->fieldEnd()->structEnd()),而tuple schema通过一个bitset(bit位操作)来代替(structBegin,fieldBegin,structEnd,filedEnd哪些占用流空间,因为上面提及struct,field里面包含struct名,field名,类型,seq num等占用流空间信息),减少序列化和网络传输的流大小。

public String para; // required

para,helloString_args结构内部成员(方法参数具体值)。 _Field,struct内部所有成员的enum表示,用于上面standard schema读取(见上述代码)

/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
PARA((short)1, "para"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
} /**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // PARA
return PARA;
default:
return null;
}
} /**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
} /**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
} private final short _thriftId;
private final String _fieldName; _Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
} public short getThriftFieldId() {
return _thriftId;
} public String getFieldName() {
return _fieldName;
}
}

struct中field元数据metadata,FieldMetaData包含(_Field enum, fieldvaluemetadata(参数名,requirement type, TType)):

 // isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.PARA, new org.apache.thrift.meta_data.FieldMetaData("para", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(helloString_args.class, metaDataMap);
}

Requirement Type enum:

/**
* Requirement type constants.
*
*/
public final class TFieldRequirementType {
public static final byte REQUIRED = 1;
public static final byte OPTIONAL = 2;
public static final byte DEFAULT = 3;
}

一些设值,判断是否设值操作:

    public String getPara() {
return this.para;
} public helloString_args setPara(String para) {
this.para = para;
return this;
} public void unsetPara() {
this.para = null;
} /** Returns true if field para is set (has been assigned a value) and false otherwise */
public boolean isSetPara() {
return this.para != null;
}

<**************************************************************************************************************************************************>

服务器端同步处理器类:

public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor 

注册实际的处理函数类:

 private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
processMap.put("helloString", new helloString());
return processMap;
}

处理函数类:

 public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> {
public helloString() {
super("helloString");
} public helloString_args getEmptyArgsInstance() {
return new helloString_args();
} protected boolean isOneway() {//是否是单向RPC,只调用不要求返回
return false;
} public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException {
helloString_result result = new helloString_result();
result.success = iface.helloString(args.para);//调用用户编写的server实现类方法
return result; //方法值类(helloString_result类)
}
} }

<**********************************************************************************************************************************************************>

Async异步client和processor有所不同,来看看AsyncClient内部:

 public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
super(protocolFactory, clientManager, transport);
} public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
helloString_call method_call = new helloString_call(para, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}

1:TAsyncClientManager:异步客户端管理器类,AsyncClient统一把AsyncMethodCall传递给manager,内部用ConcurrentLinkedQueue,TimeOutSet,和另外开一个Thread来管理存储,消息的传递,接收,和超时处理,

并异常,完成时调用MethodCallback回调。

2:底层用Noblocking channel进行传递。

异步远程RPC调用:

public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
helloString_call method_call = new helloString_call(para, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}

helloString_call类封装了消息表现形式:

 public static class helloString_call extends org.apache.thrift.async.TAsyncMethodCall {
private String para;
public helloString_call(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.para = para;
} public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("helloString", org.apache.thrift.protocol.TMessageType.CALL, 0));
helloString_args args = new helloString_args();
args.setPara(para);
args.write(prot);
prot.writeMessageEnd();
} public String getResult() throws org.apache.thrift.TException {
if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new IllegalStateException("Method call not finished!");
}
org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
return (new Client(prot)).recv_helloString();
}
} }

异步RPC调用状态:

  public static enum State {
CONNECTING, // connecting.连接
WRITING_REQUEST_SIZE, // writing_request_size.写请求长度
WRITING_REQUEST_BODY, // writing_request_body.写请求体
READING_RESPONSE_SIZE, // reading_repsonse_size.读返回长度
READING_RESPONSE_BODY, //reading_response_body.读返回体
RESPONSE_READ, //repsonse_read.返回读取完毕
ERROR;// error.有异常
}

消息类型:

public final class TMessageType {
public static final byte CALL = 1; // 调用
public static final byte REPLY = 2; //返回应答
public static final byte EXCEPTION = 3;//异常
public static final byte ONEWAY = 4;// 单向
}

Async服务器端处理,TBaseAsyncProcessor.process():

 //Find processing function
final TMessage msg = in.readMessageBegin();
AsyncProcessFunction fn = processMap.get(msg.name);
//Get Args
TBase args = (TBase)fn.getEmptyArgsInstance();
args.read(in);
in.readMessageEnd();
//start off processing function
fn.start(iface, args,fn.getResultHandler(fb,msg.seqid));
return true;
public static class helloString<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, helloString_args, String>

  public helloString_args getEmptyArgsInstance() {
return new helloString_args();
} public void start(I iface, helloString_args args, org.apache.thrift.async.AsyncMethodCallback<String> resultHandler) throws TException {
iface.helloString(args.para,resultHandler);
}
 public AsyncMethodCallback<String> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
final org.apache.thrift.AsyncProcessFunction fcall = this;
return new AsyncMethodCallback<String>() {
public void onComplete(String o) {
helloString_result result = new helloString_result();
result.success = o;
try {
fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
return;
} catch (Exception e) {
LOGGER.error("Exception writing to internal frame buffer", e);
}
fb.close();
}
public void onError(Exception e) {
byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
org.apache.thrift.TBase msg;
helloString_result result = new helloString_result();
{
msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
}
try {
fcall.sendResponse(fb,msg,msgType,seqid);
return;
} catch (Exception ex) {
LOGGER.error("Exception writing to internal frame buffer", ex);
}
fb.close();
}
};
} protected boolean isOneway() {
return false;
}

TProcessor, TTransport, async, schema,  server分析待续。