理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。
这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class Message:
# command
MSG_ACCEPTOR_AGREE = 0 # 追随者约定
MSG_ACCEPTOR_ACCEPT = 1 # 追随者接受
MSG_ACCEPTOR_REJECT = 2 # 追随者拒绝-网络不通
MSG_ACCEPTOR_UNACCEPT = 3 # 追随者网络通-不同意
MSG_ACCEPT = 4 # 接受
MSG_PROPOSE = 5 # 提议
MSG_EXT_PROPOSE = 6 # 额外提议
MSG_HEARTBEAT = 7 # 心跳,每隔一段时间同步消息
def __init__( self , command = None ):
self .command = command
# 把收到的消息原原路返回,作为应答消息
def copyAsReply( self , message):
# 提议ID #当前的ID #发给谁 #谁发的
self .proposalID, self .instanceID, self .to, self .source = message.proposalID, message.instanceID, message.source, message.to
self .value = message.value # 发的信息
|
然后是利用socket,线程和队列实现的消息处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# 基于socket传递消息,封装网络传递消息
import threading
import pickle
import socket
import queue
class MessagePump(threading.Thread):
# 收取消息线程
class MPHelper(threading.Thread):
#
def __init__( self , owner):
self .owner = owner
threading.Thread.__init__( self )
def run( self ):
while not self .owner.abort: # 只要所有者线程没有结束,一直接受消息
try :
(bytes, addr) = self .owner.socket.recvfrom( 2048 ) # 收取消息
msg = pickle.loads(bytes) # 读取二进制数据转化为消息
msg.source = addr[ 1 ]
self .owner.queue.put(msg) # 队列存入消息
except Exception as e:
pass
def __init__( self , owner, port, timeout = 2 ):
threading.Thread.__init__( self )
self .owner = owner
self .abort = False
self .timeout = 2
self .port = port
self .socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP通信
self .socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000 ) # 通信参数
self .socket.bind(( "localhost" , port)) # 通信地址,ip,端口
self .socket.settimeout(timeout) # 超时设置
self .queue = queue.Queue() # 队列
self .helper = MessagePump.MPHelper( self ) # 接收消息
# 运行主线程
def run( self ):
self .helper.start() # 开启收消息的线程
while not self .abort:
message = self .waitForMessage() # 阻塞等待
self .owner.recvMessage(message) # 收取消息
# 等待消息
def waitForMessage( self ):
try :
msg = self .queue.get( True , 3 ) # 抓取数据,最多等待3s
return msg
except :
return None
# 发送消息
def sendMessage( self , message):
bytes = pickle.dumps(message) # 转化为二进制
address = ( "localhost" , message.to) # 地址ip,端口(ip,port)
self .socket.sendto(bytes, address)
return True
#是否停止收取消息
def doAbort( self ):
self .abort = True
|
再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): # 类的继承
# 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序
def __init__( self , owner, port, timeout = 2 ):
MessagePump.__init__( self , owner, port, timeout) # 初始化父类
self .messages = set () # 集合避免重复
def waitForMessage( self ):
try :
msg = self .queue.get( True , 0.1 ) # 从队列抓取数据
self .messages.add(msg) # 添加消息
except Exception as e: # 处理异常
pass
# print(e)
if len ( self .messages) > 0 and random.random() < 0.95 : # Arbitrary!
msg = random.choice( list ( self .messages)) # 随机抓取消息发送
self .messages.remove(msg) # 删除消息
else :
msg = None
return msg
|
再来一个是记录类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
def __init__( self ):
self .protocols = {}
self .highestID = ( - 1 , - 1 ) # (port,count)
self .value = None
def addProtocol( self , protocol):
self .protocols[protocol.proposalID] = protocol
#
if protocol.proposalID[ 1 ] > self .highestID[ 1 ] or (
protocol.proposalID[ 1 ] = = self .highestID[ 1 ] and protocol.proposalID[ 0 ] > self .highestID[ 0 ]):
self .highestID = protocol.proposalID # 取得编号最大的协议
def getProtocol( self , protocolID):
return self .protocols[protocolID]
def cleanProtocols( self ):
keys = self .protocols.keys()
for k in keys:
protocol = self .protocols[k]
if protocol.state = = PaxosLeaderProtocol.STATE_ACCEPTED:
print ( "删除协议" )
del self .protocols[k]
|
下面就是Acceptor的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# 追随者
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxosAcceptorProtocol import PaxosAcceptorProtocol
class PaxosAcceptor:
def __init__( self , port, leaders):
self .port = port
self .leaders = leaders
self .instances = {} # 接口列表
self .msgPump = MessagePump( self , self .port) # 消息传递器
self .failed = False
# 开始消息传送
def start( self ):
self .msgPump.start()
# 停止
def stop( self ):
self .msgPump.doAbort()
# 失败
def fail( self ):
self .failed = True
def recover( self ):
self .failed = False
# 发送消息
def sendMessage( self , message):
self .msgPump.sendMessage(message)
# 收消息,只收取为提议的消息
def recvMessage( self , message):
if message = = None :
return
if self .failed: # 失败状态不收取消息
return
if message.command = = Message.MSG_PROPOSE: # 判断消息是否为提议
if message.instanceID not in self .instances:
record = InstanceRecord() # 记录器
self .instances[message.instanceID] = record
protocol = PaxosAcceptorProtocol( self ) # 创建协议
protocol.recvProposal(message) # 收取消息
self .instances[message.instanceID].addProtocol(protocol)
else :
self .instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
# 通知客户端,
def notifyClient( self , protocol, message):
if protocol.state = = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # 提议被接受,通知
self .instances[protocol.instanceID].value = message.value # 储存信息
print (u "协议被客户端接受 %s" % message.value)
# 获取最高同意的建议
def getHighestAgreedProposal( self , instance):
return self .instances[instance].highestID # (port,count)
# 获取接口数据
def getInstanceValue( self , instance):
return self .instances[instance].value
|
那再看下AcceptorProtocol的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
from Message import Message
class PaxosAcceptorProtocol( object ):
# State variables
STATE_UNDEFINED = - 1 # 协议没有定义的情况0
STATE_PROPOSAL_RECEIVED = 0 # 收到消息
STATE_PROPOSAL_REJECTED = 1 # 拒绝链接
STATE_PROPOSAL_AGREED = 2 # 同意链接
STATE_PROPOSAL_ACCEPTED = 3 # 同意请求
STATE_PROPOSAL_UNACCEPTED = 4 # 拒绝请求
def __init__( self , client):
self .client = client
self .state = PaxosAcceptorProtocol.STATE_UNDEFINED
# 收取,只处理协议类型的消息
def recvProposal( self , message):
if message.command = = Message.MSG_PROPOSE: # 协议
self .proposalID = message.proposalID
self .instanceID = message.instanceID
(port, count) = self .client.getHighestAgreedProposal(message.instanceID) # 端口,协议内容的最高编号
# 检测编号处理消息协议
# 判断协议是否最高
if count < self .proposalID[ 1 ] or (count = = self .proposalID[ 1 ] and port < self .proposalID[ 0 ]):
self .state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # 协议同意
print ( "同意协议:%s, %s " % (message.instanceID, message.value))
value = self .client.getInstanceValue(message.instanceID)
msg = Message(Message.MSG_ACCEPTOR_AGREE) # 同意协议
msg.copyAsReply(message)
msg.value = value
msg.sequence = (port, count)
self .client.sendMessage(msg) # 发送消息
else : # 不再接受比最高协议小的提议
self .state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
return self .proposalID
else :
# 错误重试
pass
# 过度
def doTransition( self , message): # 如果当前协议状态是接受连接,消息类型是接受
if self .state = = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command = = Message.MSG_ACCEPT:
self .state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # 接收协议
msg = Message(Message.MSG_ACCEPTOR_ACCEPT) # 创造消息
msg.copyAsReply(message) # 拷贝并回复
for l in self .client.leaders:
msg.to = l
self .client.sendMessage(msg) # 给领导发送消息
self .notifyClient(message) # 通知自己
return True
raise Exception( "并非预期的状态和命令" )
# 通知 自己客户端
def notifyClient( self , message):
self .client.notifyClient( self , message)
|
接着看下Leader和LeaderProtocol实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
# 领导者
import threading
import Queue
import time
from Message import Message
from MessagePump import MessagePump
from InstanceRecord import InstanceRecord
from PaxosLeaderProtocol import PaxosLeaderProtocol
class PaxosLeader:
# 定时监听
class HeartbeatListener(threading.Thread):
def __init__( self , leader):
self .leader = leader
self .queue = Queue.Queue() # 消息队列
self .abort = False
threading.Thread.__init__( self )
def newHB( self , message):
self .queue.put(message)
def doAbort( self ):
self .abort = True
def run( self ): # 读取消息
elapsed = 0
while not self .abort:
s = time.time()
try :
hb = self .queue.get( True , 2 )
# 设定规则,谁的端口号比较高,谁就是领导
if hb.source > self .leader.port:
self .leader.setPrimary( False )
except :
self .leader.setPrimary( True )
# 定时发送
class HeartbeatSender(threading.Thread):
def __init__( self , leader):
threading.Thread.__init__( self )
self .leader = leader
self .abort = False
def doAbort( self ):
self .abort = True
def run( self ):
while not self .abort:
time.sleep( 1 )
if self .leader.isPrimary:
msg = Message(Message.MSG_HEARTBEAT)
msg.source = self .leader.port
for leader in self .leader.leaders:
msg.to = leader
self .leader.sendMessage(msg)
def __init__( self , port, leaders = None , acceptors = None ):
self .port = port
if leaders = = None :
self .leaders = []
else :
self .leaders = leaders
if acceptors = = None :
self .acceptors = []
else :
self .acceptors = acceptors
self .group = self .leaders + self .acceptors # 集合合并
self .isPrimary = False # 自身是不是领导
self .proposalCount = 0
self .msgPump = MessagePump( self , port) # 消息传送器
self .instances = {}
self .hbListener = PaxosLeader.HeartbeatListener( self ) # 监听
self .hbSender = PaxosLeader.HeartbeatSender( self ) # 发送心跳
self .highestInstance = - 1 # 协议状态
self .stoped = True # 是否正在运行
self .lasttime = time.time() # 最后一次时间
def sendMessage( self , message):
self .msgPump.sendMessage(message)
def start( self ):
self .hbSender.start()
self .hbListener.start()
self .msgPump.start()
self .stoped = False
def stop( self ):
self .hbSender.doAbort()
self .hbListener.doAbort()
self .msgPump.doAbort()
self .stoped = True
def setPrimary( self , primary): # 设置领导者
if self .isPrimary ! = primary:
# Only print if something's changed
if primary:
print (u "我是leader%s" % self .port)
else :
print (u "我不是leader%s" % self .port)
self .isPrimary = primary
# 获取所有的领导下面的追随者
def getGroup( self ):
return self .group
def getLeaders( self ):
return self .leaders
def getAcceptors( self ):
return self .acceptors
# 必须获得1/2以上的人支持
def getQuorumSize( self ):
return ( len ( self .getAcceptors()) / 2 ) + 1
def getInstanceValue( self , instanceID):
if instanceID in self .instances:
return self .instances[instanceID].value
return None
def getHistory( self ): # 历史记录
return [ self .getInstanceValue(i) for i in range ( 1 , self .highestInstance + 1 )]
# 抓取同意的数量
def getNumAccpted( self ):
return len ([v for v in self .getHistory() if v ! = None ])
# 抓取空白时间处理下事务
def findAndFillGaps( self ):
for i in range ( 1 , self .highestInstance):
if self .getInstanceValue(i) = = None :
print ( "填充空白" , i)
self .newProposal( 0 , i)
self .lasttime = time.time()
# 采集无用信息
def garbageCollect( self ):
for i in self .instances:
self .instances[i].cleanProtocols()
# 通知领导
def recvMessage( self , message):
if self .stoped:
return
if message = = None :
if self .isPrimary and time.time() - self .lasttime > 15.0 :
self .findAndFillGaps()
self .garbageCollect()
return
#处理心跳信息
if message.command = = Message.MSG_HEARTBEAT:
self .hbListener.newHB(message)
return True
#处理额外的提议
if message.command = = Message.MSG_EXT_PROPOSE:
print ( "额外的协议" , self .port, self .highestInstance)
if self .isPrimary:
self .newProposal(message.value)
return True
if self .isPrimary and message.command ! = Message.MSG_ACCEPTOR_ACCEPT:
self .instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
if message.command = = Message.MSG_ACCEPTOR_ACCEPT:
if message.instanceID not in self .instances:
self .instances[message.instanceID] = InstanceRecord()
record = self .instances[message.instanceID]
if message.proposalID not in record.protocols: #创建协议
protocol = PaxosLeaderProtocol( self )
protocol.state = PaxosLeaderProtocol.STATE_AGREED
protocol.proposalID = message.proposalID
protocol.instanceID = message.instanceID
protocol.value = message.value
record.addProtocol(protocol)
else :
protocol = record.getProtocol(message.proposalID)
protocol.doTransition(message)
return True
# 新建提议
def newProposal( self , value, instance = None ):
protocol = PaxosLeaderProtocol( self )
if instance = = None : # 创建协议标号
self .highestInstance + = 1
instanceID = self .highestInstance
else :
instanceID = instance
self .proposalCount + = 1
id = ( self .port, self .proposalCount)
if instanceID in self .instances:
record = self .instances[instanceID]
else :
record = InstanceRecord()
self .instances[instanceID] = record
protocol.propose(value, id , instanceID)
record.addProtocol(protocol)
def notifyLeader( self , protocol, message):
if protocol.state = = PaxosLeaderProtocol.STATE_ACCEPTED:
print ( "协议接口%s被%s接受" % (message.instanceID, message.value))
self .instances[message.instanceID].accepted = True
self .instances[message.instanceID].value = message.value
self .highestInstance = max (message.instanceID, self .highestInstance)
return
if protocol.state = = PaxosLeaderProtocol.STATE_REJECTED: # 重新尝试
self .proposalCount = max ( self .proposalCount, message.highestPID[ 1 ])
self .newProposal(message.value)
return True
if protocol.state = = PaxosLeaderProtocol.STATE_UNACCEPTED:
pass
|
LeaderProtocol实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
from Message import Message
class PaxosLeaderProtocol( object ):
STATE_UNDEFINED = - 1 # 协议没有定义的情况0
STATE_PROPOSED = 0 # 协议消息
STATE_REJECTED = 1 # 拒绝链接
STATE_AGREED = 2 # 同意链接
STATE_ACCEPTED = 3 # 同意请求
STATE_UNACCEPTED = 4 # 拒绝请求
def __init__( self , leader):
self .leader = leader
self .state = PaxosLeaderProtocol.STATE_UNDEFINED
self .proposalID = ( - 1 , - 1 )
self .agreecount, self .acceptcount = ( 0 , 0 )
self .rejectcount, self .unacceptcount = ( 0 , 0 )
self .instanceID = - 1
self .highestseen = ( 0 , 0 )
# 提议
def propose( self , value, pID, instanceID):
self .proposalID = pID
self .value = value
self .instanceID = instanceID
message = Message(Message.MSG_PROPOSE)
message.proposalID = pID
message.instanceID = instanceID
message.value = value
for server in self .leader.getAcceptors():
message.to = server
self .leader.sendMessage(message)
self .state = PaxosLeaderProtocol.STATE_PROPOSED
return self .proposalID
# 過度
def doTransition( self , message):
# 根據狀態運行協議
if self .state = = PaxosLeaderProtocol.STATE_PROPOSED:
if message.command = = Message.MSG_ACCEPTOR_AGREE:
self .agreecount + = 1
if self .agreecount > = self .leader.getQuorumSize(): # 选举
print (u "达成协议的法定人数,最后的价值回答是:%s" % message.value)
if message.value ! = None :
if message.sequence[ 0 ] > self .highestseen[ 0 ] or (
message.sequence[ 0 ] = = self .highestseen[ 0 ] and message.sequence[ 1 ] > self .highestseen[
1 ]):
self .value = message.value
self .highestseen = message.sequence
self .state = PaxosLeaderProtocol.STATE_AGREED # 同意更新
# 发送同意消息
msg = Message(Message.MSG_ACCEPT)
msg.copyAsReply(message)
msg.value = self .value
msg.leaderID = msg.to
for server in self .leader.getAcceptors():
msg.to = server
self .leader.sendMessage(msg)
self .leader.notifyLeader( self , message)
return True
if message.command = = Message.MSG_ACCEPTOR_REJECT:
self .rejectcount + = 1
if self .rejectcount > = self .leader.getQuorumSize():
self .state = PaxosLeaderProtocol.STATE_REJECTED
self .leader.notifyLeader( self , message)
return True
if self .state = = PaxosLeaderProtocol.STATE_AGREED:
if message.command = = Message.MSG_ACCEPTOR_ACCEPT: # 同意协议
self .acceptcount + = 1
if self .acceptcount > = self .leader.getQuorumSize():
self .state = PaxosLeaderProtocol.STATE_ACCEPTED # 接受
self .leader.notifyLeader( self , message)
if message.command = = Message.MSG_ACCEPTOR_UNACCEPT:
self .unacceptcount + = 1
if self .unacceptcount > = self .leader.getQuorumSize():
self .state = PaxosLeaderProtocol.STATE_UNACCEPTED
self .leader.notifyLeader( self , message)
|
测试模块:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
import socket, pickle, time
from Message import Message
from PaxosAcceptor import PaxosAcceptor
from PaxosLeader import PaxosLeader
if __name__ = = "__main__" :
# 设定5个客户端
numclients = 5
clients = [PaxosAcceptor(port, [ 54321 , 54322 ]) for port in range ( 64320 , 64320 + numclients)]
# 两个领导者
leader1 = PaxosLeader( 54321 , [ 54322 ], [c.port for c in clients])
leader2 = PaxosLeader( 54322 , [ 54321 ], [c.port for c in clients])
# 开启领导者与追随者
leader1.start()
leader1.setPrimary( True )
leader2.setPrimary( True )
leader2.start()
for c in clients:
c.start()
# 破坏,客户端不链接
clients[ 0 ].fail()
clients[ 1 ].fail()
# 通信
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp协议
start = time.time()
for i in range ( 1000 ):
m = Message(Message.MSG_EXT_PROPOSE) # 消息
m.value = 0 + i # 消息参数
m.to = 54322 # 设置传递的端口
bytes = pickle.dumps(m) # 提取的二进制数据
s.sendto(bytes, ( "localhost" , m.to)) # 发送消息
while leader2.getNumAccpted() < 999 :
print ( "休眠的这一秒 %d " % leader2.getNumAccpted())
time.sleep( 1 )
print (u "休眠10秒" )
time.sleep( 10 )
print (u "停止leaders" )
leader1.stop()
leader2.stop()
print (u "停止客户端" )
for c in clients:
c.stop()
print (u "leader1历史纪录" )
print (leader1.getHistory())
print (u "leader2历史纪录" )
print (leader2.getHistory())
end = time.time()
print (u "一共用了%f秒" % (end - start))
|
代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/baidu_17508977/article/details/80741916