一个分布式rpc框架的实现方案(二)

时间:2022-03-02 16:53:17

之前实现了一个点对点的rpc功能框架,使用简单的协议将调用接口和参数传给服务端。由于最近在看zk相关的内容,于是准备引入zk进行分布式管理。
先看一下前一个简单版本的实现原理:
一个分布式rpc框架的实现方案(二)

这是一个简单的点对点的远程调用实现方案, client绑定了server的ip地址。那么如果有多个server,并且server的变化情况要及时通知client应该怎么实现? 引入了zookeeper后这个问题就比较容易解决了。以下两张图表达了基于zk的分布式rpc设计思想。
一个分布式rpc框架的实现方案(二)

分布式服务的改造比较简单,将前一个版本的中写死的ip地址改成从zk中查询服务地址,根据负载均衡策略选择一个server的ip, 请求这个ip对应的服务就ok了。使用zookeeper管理server集群的方式如下:
一个分布式rpc框架的实现方案(二)

服务方在zookeeper上创建一个临时节点,节点的名称是服务器的ip。客户端读zookeeper同目录下面的子节点列表,拿到server的ip地址,再选择一个ip作为请求方调用服务。如果服务器挂掉了,由于创建的是临时节点,因此zookeeper也会同步删掉这个ZNode,所以客户端就不会再看到对应的服务ip了。
具体代码如下:
1. 实现一个zk 的监控者

public class ZKWatcher implements Watcher {
protected static final int SESSION_TIME = 2000;
protected ZooKeeper zooKeeper;

protected String SERVER_PATH = "/rpcServer";
public void init() throws IOException, KeeperException, InterruptedException{}
public ZKWatcher(){
try {
init();
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 防止链接未完成就创建节点
protected CountDownLatch connectedSemaphore = new CountDownLatch(1);
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) { // 链接成功
connectedSemaphore.countDown();
}
}

SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
public boolean createEphemeralFile(String file){
boolean result = false;
try {
String f = SERVER_PATH + "/" + file;
Stat stat = zooKeeper.exists(f, false);
if (stat == null) {
zNode = zooKeeper.create(f, ("time:" + df.format(new Date()) + " " + getLocalIp()).getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}else{
zooKeeper.setData(f, ("time:" + df.format(new Date()) + " " + getLocalIp()).getBytes(), -1);
}

result = true;
}catch (Exception e){
System.out.println("error to create zk server file");
e.printStackTrace();
}

return result;
}
}
  1. 服务端注册服务,先创建/rpcServer目录

    public class ZKServer extends ZKWatcher {

    /**
    * 注册服务端
    * @param ip
    * @return
    */
    public boolean registServer(String ip) {
    boolean result = false;
    try{
    if(StringUtils.isEmpty(ip)){
    ip = getLocalIp();
    }
    result = this.createEphemeralFile(ip);
    }catch (Exception e){

    }
    if (result){
    System.out.println("success to regist rpc server:" + ip);
    }else{
    System.out.println("fail to regist rpc server:" + ip);
    }

    return result;
    }

    public void init() throws IOException, KeeperException, InterruptedException {
    if (zooKeeper == null){
    zooKeeper = new ZooKeeper(getLocalIp(), SESSION_TIME, this);
    connectedSemaphore.await();
    }

    Stat stat = zooKeeper.exists(SERVER_PATH, false);
    if (stat != null) {
    return;
    }
    String zNode = zooKeeper.create(SERVER_PATH, ("init:" + getLocalIp()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    System.out.println("create server path:" + zNode);
    }
    }

服务provider在init时将自己ip注册到zk集群上

public class SocketProviderV2{
……
public Object provide() throws Throwable {
ServerSocket serverSocket = new ServerSocket(dataSource.getPort());
// 注册zk服务器
zkServer.registServer(null);

while (true) {
Socket socket = null;
try {
// open socket
socket = serverSocket.accept();
this.getRpcRequest(socket);
// 拿到请求后续处理
……
} catch (Exception e) {
e.printStackTrace();
}
}
}
  1. rpc客户端查询zk中的服务节点,选择一个作为请求地址

    public class ZKClient extends ZKWatcher {
    public List<String> getServerIps() throws KeeperException, InterruptedException {
    List<String> ips = new ArrayList<String>();
    try{
    ips = zooKeeper.getChildren(SERVER_PATH, false);
    }catch (Exception e){
    e.printStackTrace();
    }

    return ips;
    }

    // 随机数实现负载均衡
    Random random = new Random(1000);
    public String getBalanceIp() throws KeeperException, InterruptedException {
    List<String> ips = this.getServerIps();

    int size = ips.size();
    int index = (int)(Math.random() * size);
    return ips.get(index);
    }

    public void init() throws IOException, KeeperException, InterruptedException {
    if (zooKeeper == null){
    zooKeeper = new ZooKeeper(getLocalIp(), SESSION_TIME, this);
    connectedSemaphore.await();
    }

    System.out.println("I finished init");
    }
    }

    socket client实现

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    StringBuffer stream = this.buildRpcRequest(proxy, method, args);

    Object object = null;
    try{
    long startTime = System.currentTimeMillis();
    System.out.println("New rpc client send " + stream.toString() + " time:" + startTime);
    String serverIp = zkClient.getBalanceIp();
    System.out.println("server ip is " + serverIp);

    // 调用远程服务器数据
    ……
    }catch (IOException e){
    e.printStackTrace();
    object = e.toString();
    }

    return object;
    }

    其他的代码详见之前版本。

我们看一下调用效果:启动3虚拟机+2个本地zk 服务端,部署分布式zk环境。 将rpc provider代码打成jar包传到3个虚拟机中, 运营服务程序。
服务1

root@master:/home/caisheng# java -jar lengren-rpc.jar
begin start service v2
success to regist rpc server:192.168.226.128

服务2

root@slave01:/home/caisheng# java -jar lengren-rpc.jar
begin start service v2
success to regist rpc server:192.168.226.129

服务3

root@slave02:/home/caisheng# java -jar lengren-rpc.jar
begin start service v2
success to regist rpc server:192.168.226.130

调出zkclient,查看一下zk集群的node情况。
一个分布式rpc框架的实现方案(二)

OK,服务全部注册成功。 本地运行rpc client, 
Client: 调用到ip 130的机器上

 I finished init
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:proxyLearn.beans.PrintText|##|method:print|##|params:java.lang.String+test rpc, time:1467282617593
server ip is 192.168.226.130
client read from service:success aop, result=i am real printtest rpc time:14
success aop, result=i am real printtest rpc

130 server:
一个分布式rpc框架的实现方案(二)

再次运行Client: 这次调用到了ip 128的机器上。

I finished init
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:proxyLearn.beans.PrintText|##|method:print|##|params:java.lang.String+test rpc, time:1467280709114
server ip is 192.168.226.128
client read from service:success aop, result=i am real printtest rpc time:17
success aop, result=i am real printtest rpc

128 server:
一个分布式rpc框架的实现方案(二)

未完待续,下次将解决更加细节的问题,例如线程池的维护和NIO。