之前实现了一个点对点的rpc功能框架,使用简单的协议将调用接口和参数传给服务端。由于最近在看zk相关的内容,于是准备引入zk进行分布式管理。
先看一下前一个简单版本的实现原理:
这是一个简单的点对点的远程调用实现方案, client绑定了server的ip地址。那么如果有多个server,并且server的变化情况要及时通知client应该怎么实现? 引入了zookeeper后这个问题就比较容易解决了。以下两张图表达了基于zk的分布式rpc设计思想。
分布式服务的改造比较简单,将前一个版本的中写死的ip地址改成从zk中查询服务地址,根据负载均衡策略选择一个server的ip, 请求这个ip对应的服务就ok了。使用zookeeper管理server集群的方式如下:
服务方在zookeeper上创建一个临时节点,节点的名称是服务器的ip。客户端读zookeeper同目录下面的子节点列表,拿到server的ip地址,再选择一个ip作为请求方调用服务。如果服务器挂掉了,由于创建的是临时节点,因此zookeeper也会同步删掉这个ZNode,所以客户端就不会再看到对应的服务ip了。
具体代码如下:
1. 实现一个zk 的监控者
public class ZKWatcher implements Watcher {
protected static final int SESSION_TIME = 2000;
protected ZooKeeper zooKeeper;
protected String SERVER_PATH = "/rpcServer";
public void init() throws IOException, KeeperException, InterruptedException{}
public ZKWatcher(){
try {
init();
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 防止链接未完成就创建节点
protected CountDownLatch connectedSemaphore = new CountDownLatch(1);
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) { // 链接成功
connectedSemaphore.countDown();
}
}
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
public boolean createEphemeralFile(String file){
boolean result = false;
try {
String f = SERVER_PATH + "/" + file;
Stat stat = zooKeeper.exists(f, false);
if (stat == null) {
zNode = zooKeeper.create(f, ("time:" + df.format(new Date()) + " " + getLocalIp()).getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}else{
zooKeeper.setData(f, ("time:" + df.format(new Date()) + " " + getLocalIp()).getBytes(), -1);
}
result = true;
}catch (Exception e){
System.out.println("error to create zk server file");
e.printStackTrace();
}
return result;
}
}
-
服务端注册服务,先创建/rpcServer目录
public class ZKServer extends ZKWatcher {
/**
* 注册服务端
* @param ip
* @return
*/
public boolean registServer(String ip) {
boolean result = false;
try{
if(StringUtils.isEmpty(ip)){
ip = getLocalIp();
}
result = this.createEphemeralFile(ip);
}catch (Exception e){
}
if (result){
System.out.println("success to regist rpc server:" + ip);
}else{
System.out.println("fail to regist rpc server:" + ip);
}
return result;
}
public void init() throws IOException, KeeperException, InterruptedException {
if (zooKeeper == null){
zooKeeper = new ZooKeeper(getLocalIp(), SESSION_TIME, this);
connectedSemaphore.await();
}
Stat stat = zooKeeper.exists(SERVER_PATH, false);
if (stat != null) {
return;
}
String zNode = zooKeeper.create(SERVER_PATH, ("init:" + getLocalIp()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("create server path:" + zNode);
}
}
服务provider在init时将自己ip注册到zk集群上
public class SocketProviderV2{
……
public Object provide() throws Throwable {
ServerSocket serverSocket = new ServerSocket(dataSource.getPort());
// 注册zk服务器
zkServer.registServer(null);
while (true) {
Socket socket = null;
try {
// open socket
socket = serverSocket.accept();
this.getRpcRequest(socket);
// 拿到请求后续处理
……
} catch (Exception e) {
e.printStackTrace();
}
}
}
-
rpc客户端查询zk中的服务节点,选择一个作为请求地址
public class ZKClient extends ZKWatcher {
public List<String> getServerIps() throws KeeperException, InterruptedException {
List<String> ips = new ArrayList<String>();
try{
ips = zooKeeper.getChildren(SERVER_PATH, false);
}catch (Exception e){
e.printStackTrace();
}
return ips;
}
// 随机数实现负载均衡
Random random = new Random(1000);
public String getBalanceIp() throws KeeperException, InterruptedException {
List<String> ips = this.getServerIps();
int size = ips.size();
int index = (int)(Math.random() * size);
return ips.get(index);
}
public void init() throws IOException, KeeperException, InterruptedException {
if (zooKeeper == null){
zooKeeper = new ZooKeeper(getLocalIp(), SESSION_TIME, this);
connectedSemaphore.await();
}
System.out.println("I finished init");
}
}socket client实现
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
StringBuffer stream = this.buildRpcRequest(proxy, method, args);
Object object = null;
try{
long startTime = System.currentTimeMillis();
System.out.println("New rpc client send " + stream.toString() + " time:" + startTime);
String serverIp = zkClient.getBalanceIp();
System.out.println("server ip is " + serverIp);
// 调用远程服务器数据
……
}catch (IOException e){
e.printStackTrace();
object = e.toString();
}
return object;
}
我们看一下调用效果:启动3虚拟机+2个本地zk 服务端,部署分布式zk环境。 将rpc provider代码打成jar包传到3个虚拟机中, 运营服务程序。
服务1
root@master:/home/caisheng# java -jar lengren-rpc.jar
begin start service v2
success to regist rpc server:192.168.226.128
服务2
root@slave01:/home/caisheng# java -jar lengren-rpc.jar
begin start service v2
success to regist rpc server:192.168.226.129
服务3
root@slave02:/home/caisheng# java -jar lengren-rpc.jar
begin start service v2
success to regist rpc server:192.168.226.130
OK,服务全部注册成功。 本地运行rpc client,
Client: 调用到ip 130的机器上
I finished init
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:proxyLearn.beans.PrintText|##|method:print|##|params:java.lang.String+test rpc, time:1467282617593
server ip is 192.168.226.130
client read from service:success aop, result=i am real printtest rpc time:14
success aop, result=i am real printtest rpc
再次运行Client: 这次调用到了ip 128的机器上。
I finished init
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:proxyLearn.beans.PrintText|##|method:print|##|params:java.lang.String+test rpc, time:1467280709114
server ip is 192.168.226.128
client read from service:success aop, result=i am real printtest rpc time:17
success aop, result=i am real printtest rpc
未完待续,下次将解决更加细节的问题,例如线程池的维护和NIO。