zookeeper集群
配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每个服务器上的数据是相同的,每一个服务器均可以对外提供读和写的服务,这点和redis是相同的,即对客户端来讲每个服务器都是平等的。
这篇主要分析leader的选择机制,zookeeper提供了三种方式:
- leaderelection
- authfastleaderelection
- fastleaderelection
默认的算法是fastleaderelection,所以这篇主要分析它的选举机制。
选择机制中的概念
服务器id
比如有三台服务器,编号分别是1,2,3。
编号越大在选择算法中的权重越大。
数据id
服务器中存放的最大数据id.
值越大说明数据越新,在选举算法中数据越新权重越大。
逻辑时钟
或者叫投票的次数,同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断。
选举状态
- looking,竞选状态。
- following,随从状态,同步leader状态,参与投票。
- observing,观察状态,同步leader状态,不参与投票。
- leading,领导者状态。
选举消息内容
在投票完成后,需要将投票信息发送给集群中的所有服务器,它包含如下内容。
- 服务器id
- 数据id
- 逻辑时钟
- 选举状态
选举流程图
因为每个服务器都是独立的,在启动时均从初始状态开始参与选举,下面是简易流程图。
选举状态图
描述leader选择过程中的状态变化,这是假设全部实例中均没有数据,假设服务器启动顺序分别为:a,b,c。
源码分析
quorumpeer
主要看这个类,只有looking状态才会去执行选举算法。每个服务器在启动时都会选择自己做为领导,然后将投票信息发送出去,循环一直到选举出领导为止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
public void run() {
//.......
try {
while (running) {
switch (getpeerstate()) {
case looking:
if ( boolean .getboolean( "readonlymode.enabled" )) {
//...
try {
//投票给自己...
setcurrentvote(makelestrategy().lookforleader());
} catch (exception e) {
//...
} finally {
//...
}
} else {
try {
//...
setcurrentvote(makelestrategy().lookforleader());
} catch (exception e) {
//...
}
}
break ;
case observing:
//...
break ;
case following:
//...
break ;
case leading:
//...
break ;
}
}
} finally {
//...
}
}
|
fastleaderelection
它是zookeeper默认提供的选举算法,核心方法如下:具体的可以与本文上面的流程图对照。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
public vote lookforleader() throws interruptedexception {
//...
try {
hashmap< long , vote> recvset = new hashmap< long , vote>();
hashmap< long , vote> outofelection = new hashmap< long , vote>();
int nottimeout = finalizewait;
synchronized ( this ){
//给自己投票
logicalclock.incrementandget();
updateproposal(getinitid(), getinitlastloggedzxid(), getpeerepoch());
}
//将投票信息发送给集群中的每个服务器
sendnotifications();
//循环,如果是竞选状态一直到选举出结果
while ((self.getpeerstate() == serverstate.looking) &&
(!stop)){
notification n = recvqueue.poll(nottimeout,
timeunit.milliseconds);
//没有收到投票信息
if (n == null ){
if (manager.havedelivered()){
sendnotifications();
} else {
manager.connectall();
}
//...
}
//收到投票信息
else if (self.getcurrentandnextconfigvoters().contains(n.sid)) {
switch (n.state) {
case looking:
// 判断投票是否过时,如果过时就清除之前已经接收到的信息
if (n.electionepoch > logicalclock.get()) {
logicalclock.set(n.electionepoch);
recvset.clear();
//更新投票信息
if (totalorderpredicate(n.leader, n.zxid, n.peerepoch,
getinitid(), getinitlastloggedzxid(), getpeerepoch())) {
updateproposal(n.leader, n.zxid, n.peerepoch);
} else {
updateproposal(getinitid(),
getinitlastloggedzxid(),
getpeerepoch());
}
//发送投票信息
sendnotifications();
} else if (n.electionepoch < logicalclock.get()) {
//忽略
break ;
} else if (totalorderpredicate(n.leader, n.zxid, n.peerepoch,
proposedleader, proposedzxid, proposedepoch)) {
//更新投票信息
updateproposal(n.leader, n.zxid, n.peerepoch);
sendnotifications();
}
recvset.put(n.sid, new vote(n.leader, n.zxid, n.electionepoch, n.peerepoch));
//判断是否投票结束
if (termpredicate(recvset,
new vote(proposedleader, proposedzxid,
logicalclock.get(), proposedepoch))) {
// verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizewait,
timeunit.milliseconds)) != null ){
if (totalorderpredicate(n.leader, n.zxid, n.peerepoch,
proposedleader, proposedzxid, proposedepoch)){
recvqueue.put(n);
break ;
}
}
if (n == null ) {
self.setpeerstate((proposedleader == self.getid()) ?
serverstate.leading: learningstate());
vote endvote = new vote(proposedleader,
proposedzxid, proposedepoch);
leaveinstance(endvote);
return endvote;
}
}
break ;
case observing:
//忽略
break ;
case following:
case leading:
//如果是同一轮投票
if (n.electionepoch == logicalclock.get()){
recvset.put(n.sid, new vote(n.leader, n.zxid, n.electionepoch, n.peerepoch));
//判断是否投票结束
if (termpredicate(recvset, new vote(n.leader,
n.zxid, n.electionepoch, n.peerepoch, n.state))
&& checkleader(outofelection, n.leader, n.electionepoch)) {
self.setpeerstate((n.leader == self.getid()) ?
serverstate.leading: learningstate());
vote endvote = new vote(n.leader, n.zxid, n.peerepoch);
leaveinstance(endvote);
return endvote;
}
}
//记录投票已经完成
outofelection.put(n.sid, new vote(n.leader,
ignorevalue, ignorevalue, n.peerepoch, n.state));
if (termpredicate(outofelection, new vote(n.leader,
ignorevalue, ignorevalue, n.peerepoch, n.state))
&& checkleader(outofelection, n.leader, ignorevalue)) {
synchronized ( this ){
logicalclock.set(n.electionepoch);
self.setpeerstate((n.leader == self.getid()) ?
serverstate.leading: learningstate());
}
vote endvote = new vote(n.leader, n.zxid, n.peerepoch);
leaveinstance(endvote);
return endvote;
}
break ;
default :
//忽略
break ;
}
} else {
log.warn( "ignoring notification from non-cluster member " + n.sid);
}
}
return null ;
} finally {
//...
}
}
|
判断是否已经胜出
默认是采用投票数大于半数则胜出的逻辑。
选举流程简述
目前有5台服务器,每台服务器均没有数据,它们的编号分别是1,2,3,4,5,按编号依次启动,它们的选择举过程如下:
- 服务器1启动,给自己投票,然后发投票信息,由于其它机器还没有启动所以它收不到反馈信息,服务器1的状态一直属于looking。
- 服务器2启动,给自己投票,同时与之前启动的服务器1交换结果,由于服务器2的编号大所以服务器2胜出,但此时投票数没有大于半数,所以两个服务器的状态依然是looking。
- 服务器3启动,给自己投票,同时与之前启动的服务器1,2交换信息,由于服务器3的编号最大所以服务器3胜出,此时投票数正好大于半数,所以服务器3成为领导者,服务器1,2成为小弟。
- 服务器4启动,给自己投票,同时与之前启动的服务器1,2,3交换信息,尽管服务器4的编号大,但之前服务器3已经胜出,所以服务器4只能成为小弟。
- 服务器5启动,后面的逻辑同服务器4成为小弟。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持服务器之家!
原文链接:http://www.cnblogs.com/ASPNET2008/p/6421571.html