自定义基于netty的rpc框架(3)---客户端的实现

时间:2021-11-10 08:56:27

1、服务器端

1.1、rpc-client-demo 客户端框架的实现

cn.tianjun.rpc.client.RpcClient

package cn.tianjun.rpc.client;

import cn.tianjun.rpc.utils.RpcDecoder;
import cn.tianjun.rpc.utils.RpcEncoder;
import cn.tianjun.rpc.utils.RpcRequest;
import cn.tianjun.rpc.utils.RpcResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
* 框架的RPC 客户端(用于发送 RPC 请求)
*
*/

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {

private static final Logger LOGGER = LoggerFactory
.getLogger(RpcClient.class);

private String host;
private int port;

private RpcResponse response;

private final Object obj = new Object();

public RpcClient(String host, int port) {
this.host = host;
this.port = port;
}

/**
* 链接服务端,发送消息
*
* @param request
* @return
* @throws Exception
*/

public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel)
throws Exception {
// 向pipeline中添加编码、解码、业务处理的handler
channel.pipeline()
.addLast(new RpcEncoder(RpcRequest.class)) //OUT - 1
.addLast(new RpcDecoder(RpcResponse.class)) //IN - 1
.addLast(RpcClient.this); //IN - 2
}
}).option(ChannelOption.SO_KEEPALIVE, true);
// 链接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
//将request对象写入outbundle处理后发出(即RpcEncoder编码器)
future.channel().writeAndFlush(request).sync();

// 用线程等待的方式决定是否关闭连接
// 其意义是:先在此阻塞,等待获取到服务端的返回后,被唤醒,从而关闭网络连接
synchronized (obj) {
obj.wait();
}
if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}

/**
* 读取服务端的返回结果
*/

@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response)
throws Exception {
this.response = response;

synchronized (obj) {
obj.notifyAll();
}
}

/**
* 异常处理
*/

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
LOGGER.error("client caught exception", cause);
ctx.close();
}
}

cn.tianjun.rpc.client.RpcProxy

package cn.tianjun.rpc.client;

import cn.tianjun.rpc.utils.RpcRequest;
import cn.tianjun.rpc.utils.RpcResponse;
import cn.tianjun.zk.ServiceDiscovery;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;


/**
* RPC 代理(用于创建 RPC 服务代理)
*
*/

public class RpcProxy {

private String serverAddress;
private ServiceDiscovery serviceDiscovery;

public RpcProxy(String serverAddress) {
this.serverAddress = serverAddress;
}

public RpcProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}

/**
* 创建代理
*
* @param interfaceClass
* @return
*/

@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] { interfaceClass }, new InvocationHandler() {
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
//创建RpcRequest,封装被代理类的属性
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
//拿到声明这个方法的业务接口名称
request.setClassName(method.getDeclaringClass()
.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
//查找服务
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover();
}
//随机获取服务的地址
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
//创建Netty实现的RpcClient,链接服务端
RpcClient client = new RpcClient(host, port);
//通过netty向服务端发送请求
RpcResponse response = client.send(request);
//返回信息
if (response.isError()) {
throw response.getError();
} else {
return response.getResult();
}
}
});
}
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<parent>
<artifactId>rpc.demo</artifactId>
<groupId>tj.cmcc.org</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-client</artifactId>
<packaging>jar</packaging>
<name>rpc-client Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<!--rpc modules-->
<dependency>
<groupId>tj.cmcc.org</groupId>
<artifactId>rpc-zk</artifactId>
</dependency>
<dependency>
<groupId>tj.cmcc.org</groupId>
<artifactId>rpc-utils</artifactId>
</dependency>
</dependencies>

<build>
<finalName>rpc-client</finalName>
</build>
</project>

1.2、rpc-client-impl-demo客户端业务的实现

cn.tianjun.rpc.clientImpl.HelloServiceTest

package cn.tianjun.rpc.clientImpl;

import cn.tianjun.rpc.client.RpcProxy;
import cn.tianjun.rpc.protocol.HelloService;
import cn.tianjun.rpc.protocol.Person;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring.xml")
public class HelloServiceTest {

@Autowired
private RpcProxy rpcProxy;

@Test
public void helloTest1() {
// 调用代理的create方法,代理HelloService接口
HelloService helloService = rpcProxy.create(HelloService.class);

// 调用代理的方法,执行invoke
String result = helloService.hello("World");
System.out.println("服务端返回结果:");
System.out.println(result);
}

@Test
public void helloTest2() {
HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello(new Person("Tian", "Jun"));
System.out.println("服务端返回结果:");
System.out.println(result);
}
}

spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd"
>


<context:property-placeholder location="classpath:rpc.properties"/>

<bean id="serviceDiscovery" class="cn.tianjun.zk.ServiceDiscovery">
<constructor-arg name="registryAddress" value="${registry.address}"/>
</bean>

<bean id="rpcProxy" class="cn.tianjun.rpc.client.RpcProxy">
<constructor-arg name="serviceDiscovery" ref="serviceDiscovery"/>
</bean>

</beans>

rpc.properties

# zookeeper server
registry.address=mini04:2181,mini05:2181,mini06:2181

log4j.properties

log4j.rootLogger=ERROR,console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%m%n

log4j.logger.com.xxx.rpc=DEBUG

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<parent>
<artifactId>rpc.demo</artifactId>
<groupId>tj.cmcc.org</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-client-impl</artifactId>
<packaging>jar</packaging>
<name>rpc-client-impl Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<!-- JUnit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>

<!--rpc modules-->
<dependency>
<groupId>tj.cmcc.org</groupId>
<artifactId>rpc-client</artifactId>
</dependency>
<dependency>
<groupId>tj.cmcc.org</groupId>
<artifactId>rpc-protocol</artifactId>
</dependency>
</dependencies>
<build>
<finalName>rpc-client-impl</finalName>
</build>
</project>

相关文章