Zookeeper实现数据的发布和订阅

时间:2022-04-04 20:40:47
使用场景 

      当一个对象的改变,需要通知其他对象而且不知道要通知多少个对象,可以使用发布订阅模式 。在分布式中的应用有配置管理(Configuration Management) 、集群管理(Group Membership)/服务发现。

        配置管理
        如果集群中的机器的程序配置都是一样的,而且需要动态修改,我们可以使用发布和订阅模式,把配置做统一的管理。
        故名思议就是一方把数据发布出来,另一方通过某种手段可以得到这些数据。通常数据订阅有两种方式:推模式和拉模式。
        推模式一般是服务器n主动向客户端推送信息,拉模式是客户端主动去服务器获取数据(通常是采用定时轮询的方式)。
        zk采用两种方式相结合,发布者将数据发布到zk集群节点上,订阅者通过一定的方法告诉服务器,我对哪个节点的数据感兴趣,那服务器在这些节点的数据发生变化时,就通知客户端,客户端得到通知后可以去服务器获取信息。

        集群管理(Group Membership)/服务发现
        集群管理,在分布式系统中,我们常常需要知道某个机器是否可用,传统开发中,可以通过ping某个机器来实现,ping得通说明对方是可用的,相反是不可用的。zk中我们所有的机器都注册一个临时节点,我们判断一个机器是否可用只需要判断这个节点在zk中是否存在就可以了,不需要直接去连接需要检查的机器,降低系统的复杂度。
        服务发现,对集群中的服务上下线做统一的管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息。而让某些监控服务器作为订阅方,订阅工作服务器的基本信息,当工作服务器的信息发生改变时,如服务上下线、服务器的角色等发生变更时,监控服务器可以得到通知,并响应这些变化。
       

原理解析

        架构图
        Zookeeper实现数据的发布和订阅Zookeeper实现数据的发布和订阅

配置管理

        config 节点用于配置管理
        Manage Server 发布消息
        Work Server 订阅消息

        Control Server 通过command节点写入命令信息,Manage Server订阅command节点的数据改变,来接收控制指令。

服务发现

        Server 用于服务器发现, Worker 启动的时候,在Server创建一个临时节点,Manage Server 通过监控Server节点的子节点的列表的改变,来更新内存中工作服务器列表的信息。
       

程序流程

        Manage Server 程序 工作流程
        Zookeeper实现数据的发布和订阅Zookeeper实现数据的发布和订阅

        Work Server程序主体流程
        Zookeeper实现数据的发布和订阅Zookeeper实现数据的发布和订阅


        类图
        Zookeeper实现数据的发布和订阅Zookeeper实现数据的发布和订阅
        SubscrbeZkClient 负责驱动WordServer和ManageServer        
        ServerConfig 记录WorkServer的配置信息
        ServerData 记录WordServer的基本信息
        

测试

服务发现
运行:SubscrbeZkClient.main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
start manage server.
start work server.
work server regist to /server.
work server regist to /server.
manage server : work server list changed, new list is 
[192.168.1.0]
 
start work server.
work server regist to /server.
manage server : work server list changed, new list is 
[192.168.1.1, 192.168.1.0]
 
start work server.
work server regist to /server.
manage server : work server list changed, new list is 
[192.168.1.1, 192.168.1.0, 192.168.1.2]
 
start work server.
work server regist to /server.
manage server : work server list changed, new list is 
[192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2]
 
start work server.
work server regist to /server.
manage server : work server list changed, new list is 
[192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]
 
敲回车键退出!

配置管理
在zkCli.sh 输入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[zk: localhost:2181(CONNECTED) 45] create /commands list
Created /commands
[zk: localhost:2181(CONNECTED) 46] set /commands create 
cZxid = 0x57ed
ctime = Wed Aug 24 11:44:21 CST 2016
mZxid = 0x57ee
mtime = Wed Aug 24 11:44:28 CST 2016
pZxid = 0x57ed
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
[zk: localhost:2181(CONNECTED) 47] set /commands modify 
cZxid = 0x57ed
ctime = Wed Aug 24 11:44:21 CST 2016
mZxid = 0x57f0
mtime = Wed Aug 24 11:44:34 CST 2016
pZxid = 0x57ed
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
控制台对应输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
manage server : cmd = list
[192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]
 
manage server : cmd = create
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
manage server : cmd = modify
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root_modify]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root_modify]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root_modify]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root_modify]
Work server : new Work server config is = ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root_modify]