ZooKeeper
ZooKeeper是一种分布式协调服务, 解决应用程序的分布式带来的问题。
1 分布式应用
分布式应用可以在给定时间(同时)在网络中的多个系统上运行,通过协调它们以快速有效的方式完成特定任务。
分布式应用正在运行的一组系统称为集群,而在集群中运行的每台机器被称为节点。
分布式应用有两部分, Server(服务器) 和 Client(客户端) 应用程序。服务器应用程序实际上是分布式的,并具有通用接口,以便客户端可以连接到集群中的任何服务器并获得相同的结果。 客户端应用程序是与分布式应用进行交互的工具。
2 ZooKeeper的基本概念
1、Architecture(架构):客户端,服务器, Ensemble( 服务器组,大于等于3台服务器), Leader( 服务器节点), Follower
2、Hierarchical namespace(层次命名空间): ZooKeeper节点称为 znode 。每个znode由一个名称标识,并用路径(/)序列分隔。 Znode被分为持久(persistent)节点,顺序(sequential)节点和临时(ephemeral)节点。
3、Session(会话): 会话中的请求按FIFO顺序执行。一旦客户端连接到服务器,将建立会话并向客户端分配会话ID 。 客户端以特定的时间间隔发送心跳以保持会话有效。
4、Watches(监视): 使客户端收到关于ZooKeeper集合中的更改的通知。
znode:
顺序节点为节点的一种特性,也就是,持久节点和临时节点都可以设置为顺序节点。所以znode一共有4种类型:持久的、临时的,持久顺序的,临时顺序的。
不应该在节点存储过多的数据。Zk规定节点的数据大小不能超过1M,但实际上我们在znode的数据量应该尽可能小,因为数据过大会导致zk的性能明显下降。如果确实需要存储大量的数据,一般解决方法是在另外的分布式数据库(例如redis)中保存这部分数据,然后在znode中我们只保留这个数据库中保存位置的索引即可。
假设客户端C1对znode /config写入一些配置信息,如果另一个客户端C2同时更新了这个znode,此时C1的版本号已经过期,C1调用setData一定不会成功。这正是版本机制有效避免了数据更新时出现的先后顺序问题。
3 Zookeeper 工作流
读取: 读取由特定连接的znode在内部执行,因此不需要与集群进行交互。
写入: 写入过程由leader节点处理。leader将写入请求转发到所有znode,并等待znode的回复。如果一半的znode回复,则写入过程完成。
四个节点类似于三个节点,额外节点不用于任何目的,因此,最好添加奇数的节点,例如3,5,7。
写入过程比ZooKeeper集合中的读取过程要贵,因为所有节点都需要在数据库中写入相同的数据。因此,对于平衡的环境拥有较少数量(例如3,5,7)的节点比拥有大量的节点要好。
4 Zookeeper leader选举
Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举。
(1) 服务器初始化启动。
(2) 服务器运行期间无法和Leader保持连接。
选举过程:
(1) 每个Server发出一个投票。由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。
(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下
· 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
· 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。
对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。
(5) 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。
5 Zookeeper下载、安装和使用
ZooKeeper软件的文件和目录
- bin目录
zk的可执行脚本目录,包括zk服务进程,zk客户端,等脚本。其中,.sh是Linux环境下的脚本,.cmd是Windows环境下的脚本。
- conf目录
配置文件目录。zoo_sample.cfg为样例配置文件,需要修改为自己的名称,一般为zoo.cfg。log4j.properties为日志配置文件。
- lib
zk依赖的包。
- contrib目录
一些用于操作zk的工具包。
- recipes目录
zk某些用法的代码示例
配置信息
看到 conf/zoo_example.cfg 的配置文件,将其修改为zoo.cfg
tickTime = 2000
dataDir = /tmp/zookeeper 默认的配置路径,可以修改。用于配置存储快照文件的目录
clientPort = 2181
initLimit = 5
syncLimit = 2
启动服务器:
启动:
hadoop@ubuntu:/opt/zookeeper-3.4.13$ bin/zkServer.sh start
下面是启动成功:
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
hadoop@ubuntu:/opt/zookeeper-3.4.13$
打开客户端,启动CLI, 命令行界面(CLI)用于与ZooKeeper集合进行交互以进行开发。
hadoop@ubuntu:/opt/zookeeper-3.4.13$ bin/zkCli.sh
将连接到ZooKeeper服务器,你应该得到以下响应:
Connecting to localhost:2181
2019-03-13 05:37:06,511 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
2019-03-13 05:37:06,528 [myid:] - INFO [main:Environment@100] - Client environment:host.name=ubuntu
2019-03-13 05:37:06,528 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_201
2019-03-13 05:37:06,534 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2019-03-13 05:37:06,535 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/opt/modules/jdk1.8.0_201/jre
2019-03-13 05:37:06,535 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/opt/zookeeper-3.4.13/bin/../build/classes:/opt/zookeeper-3.4.13/bin/../build/lib/*.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/opt/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/opt/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/opt/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/opt/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/opt/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/opt/zookeeper-3.4.13/bin/../conf:.:/opt/modules/jdk1.8.0_201/lib:/opt/modules/jdk1.8.0_201/jre/lib:
2019-03-13 05:37:06,536 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2019-03-13 05:37:06,536 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2019-03-13 05:37:06,537 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA>
2019-03-13 05:37:06,537 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux
2019-03-13 05:37:06,542 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64
2019-03-13 05:37:06,543 [myid:] - INFO [main:Environment@100] - Client environment:os.version=4.15.0-46-generic
2019-03-13 05:37:06,546 [myid:] - INFO [main:Environment@100] - Client environment:user.name=hadoop
2019-03-13 05:37:06,547 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/home/hadoop
2019-03-13 05:37:06,548 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/opt/zookeeper-3.4.13
2019-03-13 05:37:06,552 [myid:] - INFO [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@5ce65a89
Welcome to ZooKeeper!
2019-03-13 05:37:06,720 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1029] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2019-03-13 05:37:07,262 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/127.0.0.1:2181, initiating session
[zk: localhost:2181(CONNECTING) 0] 2019-03-13 05:37:07,513 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1303] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10011c8df600000, negotiated timeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
直接执行zkCli.sh命令默认以主机号 127.0.0.1,端口号 2181 来连接zk,如果要连接不同机器上的zk,可以使用 -server 参数,例如:
bin/zkCli.sh -server 192.168.0.1:2181
6 Zookeeper使用例子
一旦客户端启动,可以执行以下操作:
1、创建znode
2、获取数据
3、监视znode的变化
4、设置数据
5、创建znode的子节点
6、列出znode的子节点
7、检查状态
8、移除/删除znode
用一个例子逐个了解上面的命令:
1、创建znode
创建一个节点:
[zk: localhost:2181(CONNECTED) 1] create /FirstZnode “Myfirstzookeeper-app"
Created /FirstZnode
顺序节点
[zk: localhost:2181(CONNECTED) 2] create -s /FirstZnode second-data
Created /FirstZnode0000000001
临时节点(Ephemeral,短暂的)。当客户端断开连接时,临时节点将被删除。
[zk: localhost:2181(CONNECTED) 3] create -e /SecondZnode “Ephemeral-data"
Created /SecondZnode
2、获取数据
[zk: localhost:2181(CONNECTED) 4] get /FirstZnode
“Myfirstzookeeper-app"
cZxid = 0x2
ctime = Wed Mar 13 05:51:45 PDT 2019
mZxid = 0x2
mtime = Wed Mar 13 05:51:45 PDT 2019
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 24
numChildren = 0
返回znode的关联数据和指定znode的元数据。你将获得信息,例如上次修改数据的时间,修改的位置以及数据的相关信息。此CLI还用于分配监视器以显示数据相关的通知。
要访问顺序节点,必须输入znode的完整路径(带10位数字)
[zk: localhost:2181(CONNECTED) 7] get /FirstZnode0000000001
second-data
cZxid = 0x3
ctime = Wed Mar 13 05:52:39 PDT 2019
mZxid = 0x3
mtime = Wed Mar 13 05:52:39 PDT 2019
pZxid = 0x3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
3、监视znode的变化
[zk: localhost:2181(CONNECTED) 8] get /FirstZnode 1 这里设置一下1。只能在 get 命令中设置watch。
“Myfirstzookeeper-app"
cZxid = 0x2
ctime = Wed Mar 13 05:51:45 PDT 2019
mZxid = 0x2
mtime = Wed Mar 13 05:51:45 PDT 2019
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 24
numChildren = 0
输出类似于普通的 get 命令,但它会等待后台等待znode更改
4、设置数据
[zk: localhost:2181(CONNECTED) 9] set /SecondZnode Data-updated
cZxid = 0x4
ctime = Wed Mar 13 05:53:09 PDT 2019
mZxid = 0x5
mtime = Wed Mar 13 06:01:07 PDT 2019
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x10011c8df600000
dataLength = 12
numChildren = 0
5、创建znode的子节点
[zk: localhost:2181(CONNECTED) 15] create /FirstZnode/Child1 firstchildren
Created /FirstZnode/Child1
[zk: localhost:2181(CONNECTED) 16] create /FirstZnode/Child2 “secondchildren"
Created /FirstZnode/Child2
6、列出znode的子节点
[zk: localhost:2181(CONNECTED) 18] ls /FirstZnode
[Child2, Child1]
7、检查状态
[zk: localhost:2181(CONNECTED) 19] stat /FirstZnode
cZxid = 0x2
ctime = Wed Mar 13 05:51:45 PDT 2019
mZxid = 0x2
mtime = Wed Mar 13 05:51:45 PDT 2019
pZxid = 0x7
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 24
numChildren = 2
包含时间戳,版本号,ACL,数据长度和子znode等细项。
8、移除/删除znode
[zk: localhost:2181(CONNECTED) 20] rmr /FirstZnode
WATCHER::
WatchedEvent state:SyncConnected type:NodeDeleted path:/FirstZnode
[zk: localhost:2181(CONNECTED) 21] get /FirstZnode
Node does not exist: /FirstZnode
Hadoop是大数据行业发展的推动力。Hadoop依靠ZooKeeper进行配置管理和协调。
ZooKeeper在Hadoop中的作用:
假设 Hadoop集群 桥接100个或更多的商品服务器。那么,就需要协调和命名服务。因此涉及大量节点的计算,每个节点需要彼此同步,知道在哪里访问服务器,以及知道如何配置它们。在这个时间点,Hadoop集群需要跨节点服务器。ZooKeeper提供跨节点同步的功能,并确保跨越Hadoop项目的任务被序列化和同步化。
多个ZooKeeper服务器支持大型Hadoop集群。每个客户端机器与ZooKeeper服务器之一通信以检索和更新其同步信息。
7 ZooKeeper API
(在java程序中对zookeeper进行操作,区别于上面的命令行操作)
ZooKeeper API的核心部分是ZooKeeper类。它提供了在其构造函数中连接ZooKeeper集合的选项,并具有以下方法:
- connect - 连接到ZooKeeper集合
- create- 创建znode
- exists- 检查znode是否存在及其信息
- getData - 从特定的znode获取数据
- setData - 在特定的znode中设置数据
- getChildren - 获取特定znode中的所有子节点
- delete - 删除特定的znode及其所有子项
- close - 关闭连接
首先要把安装包中的jar包导入(在解压包里)。
connect操作
package ZooKeeper;
// import java classes
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
// import zookeeper classes
public class ZooKeeperConnection {
// declare zookeeper instance to access ZooKeeper ensemble
private ZooKeeper zoo;
final CountDownLatch connectedSignal = new CountDownLatch(1);
// Method to connect zookeeper ensemble.
public ZooKeeper connect(String host) throws IOException,InterruptedException {
zoo = new ZooKeeper(host,5000,new Watcher() {
public void process(WatchedEvent we) {
if (we.getState() == KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
return zoo;
}
// Method to disconnect from zookeeper server
public void close() throws InterruptedException {
zoo.close();
}
}
创建zookeeper节点操作
package ZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class ZKCreate {
// create static instance for zookeeper class.
private static ZooKeeper zk;
// create static instance for ZooKeeperConnection class.
private static ZooKeeperConnection conn;
// Method to create znode in zookeeper ensemble
public static void create(String path, byte[] data) throws
KeeperException,InterruptedException {
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
public static void main(String[] args) {
// znode path
String path = "/MyFirstZnode"; // Assign path to znode
// data in byte array
byte[] data = "My first zookeeper app".getBytes(); // Declare data
try {
conn = new ZooKeeperConnection();
zk = conn.connect("localhost");
create(path, data); // Create the data to the specified path
conn.close();
} catch (Exception e) {
System.out.println(e.getMessage()); //Catch error message
}
}
}
其余操作类似,不再列出。