springboot与thrift集成实现服务端和客户端

时间:2021-05-25 14:57:48

我们这里用一个简单的小功能来演示一下如何使用springboot集成thrift

这个功能是,判断hdfs路径存在。

 

1、先解决依赖

 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
</dependencies>

 

2、编译thrift文件

先安装thrift编译器。

jazz.thrift文件如下。namespace相当于java里的package。thrift文件的写法这里就不赘述了。

namespace java com.xiaoju.dqa.jazz.iface

service JazzService{
bool exists(1:string path)
}

编译thrift文件

thrift -gen java jazz.thrift

这将生成的JazzService.java文件,拷贝到项目中,放到namespace指定的package下。

 

3、实现server端

编写controller实现功能。

可以看到Controller实现了JazzService.Iface接口,这个接口就是刚才生成的JazzService.java文件中。

这个接口中我们重写了exists方法,这个方法就是定义在thfift文件中的方法。

package com.xiaoju.dqa.jazz.service.controller;

import com.xiaoju.dqa.jazz.hadoop.client.HadoopClient;
import com.xiaoju.dqa.jazz.hive.client.HiveClient;
import com.xiaoju.dqa.jazz.iface.JazzService;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;

@Controller
public class JazzRpcController implements JazzService.Iface {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private HadoopClient hadoopClient;
@Autowired
private HiveClient hiveClient;

@Override
public boolean exists(String path) throws TException {
boolean isExists = false;
try {
isExists
= hadoopClient.exists(path);
logger.info(
"[存在]判断路径是否存在成功, 路径={}, 结果={}", path, isExists);
}
catch (Exception e) {
logger.error(
"[存在]判断路径是否存在失败, 路径={}", path, e);
}
return isExists;
}
}

为了启动thriftserver我们建立一个类ThriftServer

package com.xiaoju.dqa.jazz.service.server;

import com.xiaoju.dqa.jazz.iface.JazzService;
import com.xiaoju.dqa.jazz.service.controller.JazzRpcController;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class ThriftServer {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());

@Value(
"${thrift.port}")
private int port;
@Value(
"${thrift.minWorkerThreads}")
private int minThreads;
@Value(
"${thrift.maxWorkerThreads}")
private int maxThreads;

private TBinaryProtocol.Factory protocolFactory;
private TTransportFactory transportFactory;

@Autowired
private JazzRpcController jazzRpcController;

public void init() {
protocolFactory
= new TBinaryProtocol.Factory();
transportFactory
= new TTransportFactory();
}

public void start() {
//TMultiplexedProcessor processor = new TMultiplexedProcessor();
//processor.registerProcessor(JazzService.class.getSimpleName(), new JazzService.Processor<JazzService.Iface>(hadoopService));
JazzService.Processor processor = new JazzService.Processor<JazzService.Iface>(jazzRpcController);
init();
try {
TServerTransport transport
= new TServerSocket(port);
TThreadPoolServer.Args tArgs
= new TThreadPoolServer.Args(transport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
tArgs.transportFactory(transportFactory);
tArgs.minWorkerThreads(minThreads);
tArgs.maxWorkerThreads(maxThreads);
TServer server
= new TThreadPoolServer(tArgs);
//TServer server = new TSimpleServer(tArgs);
logger.info("thrift服务启动成功, 端口={}", port);
server.serve();
}
catch (Exception e) {
logger.error(
"thrift服务启动失败", e);
}

}
}

server端启动方式

这里我们使用了bean注入的方式启动thriftserver。

package com.xiaoju.dqa.jazz;

import com.xiaoju.dqa.jazz.service.server.ThriftServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;


@SpringBootApplication
public class JazzApplication {
private static ThriftServer thriftServer;

public static void main(String[] args) {
ApplicationContext context
= SpringApplication.run(JazzApplication.class, args);
try {
thriftServer
= context.getBean(ThriftServer.class);
thriftServer.start();
}
catch (Exception e) {
e.printStackTrace();
}
}

}

 

4、实现client端

定义JazzClient

package com.xiaoju.dqa.jazz.client;

import com.xiaoju.dqa.jazz.iface.JazzService;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;

public class JazzClient {
private JazzService.Client jazzService;
private TBinaryProtocol protocol;
private TSocket transport;
private String host;
private int port;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}

public void init() {
transport
= new TSocket(host, port);
protocol
= new TBinaryProtocol(transport);
jazzService
= new JazzService.Client(protocol);
}

public JazzService.Client getJazzService() {
return jazzService;
}

public void open() throws TTransportException {
transport.open();
}

public void close()
{
transport.close();
}

}

config生成bean

package com.xiaoju.dqa.jazz.configuration;

import com.xiaoju.dqa.jazz.client.JazzClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JazzClientConfig {
@Value(
"${thrift.host}")
private String host;
@Value(
"${thrift.port}")
private int port;

@Bean(initMethod
= "init")
public JazzClient jazzClient() {
JazzClient jazzClient
= new JazzClient();
jazzClient.setHost(host);
jazzClient.setPort(port);
return jazzClient;
}
}

写一个controller作为调用入口

package com.xiaoju.dqa.jazz.controller;

import com.xiaoju.dqa.jazz.client.JazzClient;
import com.xiaoju.dqa.jazz.response.Response;
import com.xiaoju.dqa.jazz.response.ResultCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping(
"/jazz")
public class JazzClientController {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private JazzClient jazzClient;

@RequestMapping(value
= "/exists", method = RequestMethod.GET)
public Response exists(HttpServletRequest request, HttpServletResponse response) {
Map
<String, Object> retMap = new HashMap<String, Object>();
try {
logger.info(
"[存在]判断路径是否存在");
String path
= request.getParameter("path");
jazzClient.open();
boolean isExists = jazzClient.getJazzService().exists(path);
retMap.put(
"result", isExists);
logger.info(
"[存在]判断路径是否存在成功, 返回={}", retMap);
return new Response(ResultCode.SUCCESS, "判断存在成功" , retMap);
}
catch (Exception e) {
logger.error(
"[存在]判断路径是否存在失败, 返回={}", retMap, e);
return new Response(ResultCode.EXCEPTION, "判断存在失败", retMap);
}
finally {
jazzClient.close();
}
}
}

 

你可以使用如下方式测试代码

curl "http://10.93.18.34:8698/jazz/exists?path=/home/...."