ZK服务管理中心

时间:2022-07-26 00:31:17

ZK基础类及服务的注册与发现:

ZK服务管理中心ZK服务管理中心
package top.letsgogo.util;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;

import java.util.List;
import java.util.Map;

/**
*
@author panteng
* @description
* @date 17-6-9.
*/
public class ZkManager {
private static String ZKServers = "10.38.164.80:2181,10.38.164.80:2182,10.38.164.80:2183";
private static ZkClient zkClient = new ZkClient(ZKServers, 10000, 10000, new SerializableSerializer());

/**
* 遍历所有节点
*
*
@param currentPath
*
@param nodes
*/
public static void getAllNodesAndVlue(String currentPath, Map<String, Object> nodes) {
try {
List
<String> stringList = zkClient.getChildren(currentPath);
for (String childPath : stringList) {
if ("/".equals(currentPath)) {
childPath
= currentPath + childPath;
}
else {
childPath
= currentPath + "/" + childPath;
}
try {
if (childPath.indexOf("zookeeper") > -1) {
continue;
}
Object nodeVlue
= zkClient.readData(childPath);
nodes.put(childPath, nodeVlue);
}
catch (Exception e) {
System.out.println(
"node路径:" + childPath);
e.printStackTrace();
}
getAllNodesAndVlue(childPath, nodes);
}
}
catch (Exception e) {
if (e.getMessage().indexOf("KeeperErrorCode = NoNode for") > -1) {
return;
}
}
}

/**
* 增加不存在的节点,如果节点已经存在,返回""
*
*
@param path
*
@param value
*
@param mode
*
@return 返回"" 表示增加失败
*/
public static String addNode(String path, Object value, CreateMode mode) {
try {
if (zkClient.exists(path)) {
return "";
}
return zkClient.create(path, value, mode);
}
catch (Exception e) {
e.printStackTrace();
}
return "";
}


public static void main2(String[] arges) {
ZkManager.addNode(
"/dao", "data operation", CreateMode.PERSISTENT);
ZkManager.addNode(
"/service", "service provider", CreateMode.PERSISTENT);
ZkManager.addNode(
"/controller", "work control", CreateMode.PERSISTENT);
ZkManager.addNode(
"/dao/pool", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/service/pool", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/controller/pool", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/dao/configration", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/service/configration", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/controller/configration", "machine list", CreateMode.PERSISTENT);
/*ZkManager.addNode("/controller/api1", "api1", CreateMode.EPHEMERAL);
Map<String, Object> map = new HashMap<>();
ZkManager.getAllNodesAndVlue("/", map);
for (Map.Entry entry : map.entrySet()) {
System.out.println("path=" + entry.getKey() + " value=" + entry.getValue());
}
try {
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
*/
}
}
ZkManager
ZK服务管理中心ZK服务管理中心
package top.letsgogo.auto;

import com.google.common.base.Strings;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import top.letsgogo.util.ZkManager;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
*
@author panteng
* @description
* @date 17-6-9.
*/
@Component
public class ServiceRegisterDiscover implements CommandLineRunner {
@Value(
"${server.port}")
private String serverPort;

private static String serviceNamePrefix = "dao-api-";
private static String path = "/dao/pool/" + serviceNamePrefix;
private static Map<String, List<String>> nextServiceInfo = new HashMap<String, List<String>>();

@Override
public void run(String... strings) throws Exception {
try {
//首先注册向管理中心注册自己的服务
String getPath = ZkManager.addNode(path + getIpAddress() + ":" + serverPort, "config", CreateMode.EPHEMERAL);
if (!Strings.isNullOrEmpty(getPath)) {
System.out.println(getPath
+ "服务注册成功");
}
//去管理中心发现需要调用的服务
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取本机IP
*
*
@return
*/
public static String getIpAddress() {
try {
Enumeration
<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress ip
= null;
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface
= (NetworkInterface) allNetInterfaces.nextElement();
if (netInterface.isLoopback() || netInterface.isVirtual() || !netInterface.isUp()) {
continue;
}
else {
Enumeration
<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
ip
= addresses.nextElement();
if (ip != null && ip instanceof Inet4Address) {
return ip.getHostAddress();
}
}
}
}
}
catch (Exception e) {
e.printStackTrace();
}
return "";
}
}
ServiceRegisterDiscover

 /dao/pool/dao-api-10.38.164.80:8080服务注册成功

[zk: localhost:2181(CONNECTED) 0] ls /dao/pool
[dao
-api-10.38.164.80:8080]

 

服务查看:

ZK服务管理中心

[zk: localhost:2181(CONNECTED) 1] ls /dao/pool
[dao
-api-10.38.164.80:8081, dao-api-10.38.164.80:8082, dao-api-10.38.164.80:8080]


[zk: localhost:
2181(CONNECTED) 0] ls /
[service, controller, dao, zookeeper]

 

ZK服务管理中心

 


Service

ZK服务管理中心ZK服务管理中心
package top.letsgogo.auto;

import com.google.common.base.Strings;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import top.letsgogo.util.ZkManager;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.*;

/**
*
@author panteng
* @description
* @date 17-6-9.
*/
@Component
public class ServiceRegisterDiscover implements CommandLineRunner {
@Value(
"${server.port}")
private String serverPort;

private final static String serviceNamePrefix = "service-api-";
private final static String path = "/service/pool/" + serviceNamePrefix;
/**
* 被调用的服务名
*/
private final static String[] nextServiceName = new String[]{"dao-api-"};
/**
* 被调用的服务所在根路径,应该与nextServiceName中的一一对应
*/
private final static String[] nextServiceRootPath = new String[]{"/dao/pool"};

private static Map<String, List<String>> nextServiceInfo = new HashMap<String, List<String>>();

@Override
public void run(String... strings) throws Exception {
try {
//首先注册向管理中心注册自己的服务
String getPath = ZkManager.addNode(path + getIpAddress() + ":" + serverPort, "config", CreateMode.EPHEMERAL);
if (!Strings.isNullOrEmpty(getPath)) {
System.out.println(getPath
+ "服务注册成功");
}
discoverNextServiceInfo();
}
catch (Exception e) {
e.printStackTrace();
}
}

/**
* 发现服务,并监听变化
*/
public static void discoverNextServiceInfo() {
//去管理中心发现需要调用的服务
Map<String, Object> map = new HashMap<>();
ZkManager.getAllNodesAndVlue(
"/", map);
for (Map.Entry entry : map.entrySet()) {//遍历所有服务
for (int i = 0; i < nextServiceName.length; i++) {
String servicePath
= entry.getKey().toString();
if (servicePath.indexOf(nextServiceName[i]) > -1) {
List
<String> serviceList = nextServiceInfo.get(nextServiceName[i]);
if (serviceList == null) {
serviceList
= new ArrayList<String>();
}
serviceList.add(servicePath);
nextServiceInfo.put(nextServiceName[i], serviceList);
}
}
}
printNextServiceInfo();
//监听节点变化
for (int i = 0; i < nextServiceRootPath.length; i++) {
ZkManager.subscribeChildChanges(nextServiceRootPath[i],
new ServiceListener(nextServiceName[i]));
}
}

public static void printNextServiceInfo() {
for (Map.Entry entry : nextServiceInfo.entrySet()) {
System.out.print(
"发现服务名称:" + entry.getKey() + " 服务实例:");
for (String str : (List<String>) entry.getValue()) {
System.out.print(str
+ ", ");
}
System.out.println();
}
}

/**
* 获取本机IP
*
*
@return
*/
public static String getIpAddress() {
try {
Enumeration
<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
InetAddress ip
= null;
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface
= (NetworkInterface) allNetInterfaces.nextElement();
if (netInterface.isLoopback() || netInterface.isVirtual() || !netInterface.isUp()) {
continue;
}
else {
Enumeration
<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
ip
= addresses.nextElement();
if (ip != null && ip instanceof Inet4Address) {
return ip.getHostAddress();
}
}
}
}
}
catch (Exception e) {
e.printStackTrace();
}
return "";
}

public static Map<String, List<String>> getNextServiceInfo() {
return nextServiceInfo;
}

public static void setNextServiceInfo(Map<String, List<String>> nextServiceInfo) {
ServiceRegisterDiscover.nextServiceInfo
= nextServiceInfo;
}
}
ServiceRegisterDiscover
ZK服务管理中心ZK服务管理中心
package top.letsgogo.auto;

import org.I0Itec.zkclient.IZkChildListener;

import java.util.List;

/**
*
@author panteng
* @description
* @date 17-6-10.
*/
public class ServiceListener implements IZkChildListener {
String serviceName;

public ServiceListener(String serviceName) {
this.serviceName = serviceName;
}

@Override
public void handleChildChange(String s, List<String> list) throws Exception {
System.out.println(
"服务" + serviceName + "发生了变化");
ServiceRegisterDiscover.getNextServiceInfo().put(serviceName, list);
ServiceRegisterDiscover.printNextServiceInfo();
}

public String getServiceName() {
return serviceName;
}

public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}
ServiceListener
ZK服务管理中心ZK服务管理中心
package top.letsgogo.util;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;

import java.util.List;
import java.util.Map;

/**
*
@author panteng
* @description
* @date 17-6-9.
*/
public class ZkManager {
private static String ZKServers = "10.38.164.80:2181,10.38.164.80:2182,10.38.164.80:2183";
private static ZkClient zkClient = new ZkClient(ZKServers, 10000, 10000, new SerializableSerializer());

/**
* 遍历所有节点
*
*
@param currentPath
*
@param nodes
*/
public static void getAllNodesAndVlue(String currentPath, Map<String, Object> nodes) {
try {
List
<String> stringList = zkClient.getChildren(currentPath);
for (String childPath : stringList) {
if ("/".equals(currentPath)) {
childPath
= currentPath + childPath;
}
else {
childPath
= currentPath + "/" + childPath;
}
try {
if (childPath.indexOf("zookeeper") > -1) {
continue;
}
Object nodeVlue
= zkClient.readData(childPath);
nodes.put(childPath, nodeVlue);
}
catch (Exception e) {
System.out.println(
"node路径:" + childPath);
e.printStackTrace();
}
getAllNodesAndVlue(childPath, nodes);
}
}
catch (Exception e) {
if (e.getMessage().indexOf("KeeperErrorCode = NoNode for") > -1) {
return;
}
}
}

/**
* 增加不存在的节点,如果节点已经存在,返回""
*
*
@param path
*
@param value
*
@param mode
*
@return 返回"" 表示增加失败
*/
public static String addNode(String path, Object value, CreateMode mode) {
try {
if (zkClient.exists(path)) {
return "";
}
return zkClient.create(path, value, mode);
}
catch (Exception e) {
e.printStackTrace();
}
return "";
}

public static void subscribeChildChanges(String nodePath, IZkChildListener listener) {
if (zkClient.exists(nodePath)) {
zkClient.subscribeChildChanges(nodePath, listener);
}
}

public static void main2(String[] arges) {
ZkManager.addNode(
"/dao", "data operation", CreateMode.PERSISTENT);
ZkManager.addNode(
"/service", "service provider", CreateMode.PERSISTENT);
ZkManager.addNode(
"/controller", "work control", CreateMode.PERSISTENT);
ZkManager.addNode(
"/dao/pool", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/service/pool", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/controller/pool", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/dao/configration", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/service/configration", "machine list", CreateMode.PERSISTENT);
ZkManager.addNode(
"/controller/configration", "machine list", CreateMode.PERSISTENT);
/*ZkManager.addNode("/controller/api1", "api1", CreateMode.EPHEMERAL);
Map<String, Object> map = new HashMap<>();
ZkManager.getAllNodesAndVlue("/", map);
for (Map.Entry entry : map.entrySet()) {
System.out.println("path=" + entry.getKey() + " value=" + entry.getValue());
}
try {
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
*/
}
}
ZkManager

ZK服务管理中心

/service/pool/service-api-10.232.36.21:8083服务注册成功
发现服务名称:dao-api-  服务实例:/dao/pool/dao-api-10.38.164.80:8081,  /dao/pool/dao-api-10.38.164.80:8080,  /dao/pool/dao-api-10.38.164.80:8082,

 

服务dao-api-发生了变化
发现服务名称:dao-api-  服务实例:dao-api-10.38.164.80:8081,  dao-api-10.38.164.80:8082,

ZK服务管理中心

代码仓库:https://github.com/luckyPT/ZkManager