基于python的Paxos算法实现

时间:2022-11-30 17:00:01

理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。

这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。

基于python的Paxos算法实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Message:
  # command
  MSG_ACCEPTOR_AGREE = 0 # 追随者约定
  MSG_ACCEPTOR_ACCEPT = 1 # 追随者接受
  MSG_ACCEPTOR_REJECT = 2 # 追随者拒绝-网络不通
  MSG_ACCEPTOR_UNACCEPT = 3 # 追随者网络通-不同意
  MSG_ACCEPT = 4 # 接受
  MSG_PROPOSE = 5 # 提议
  MSG_EXT_PROPOSE = 6 # 额外提议
  MSG_HEARTBEAT = 7 # 心跳,每隔一段时间同步消息
  def __init__(self, command=None):
    self.command = command
  # 把收到的消息原原路返回,作为应答消息
  def copyAsReply(self, message):
    # 提议ID #当前的ID #发给谁 #谁发的
    self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
    self.value = message.value # 发的信息

然后是利用socket,线程和队列实现的消息处理器:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 基于socket传递消息,封装网络传递消息
import threading
import pickle
import socket
import queue
class MessagePump(threading.Thread):
  # 收取消息线程
  class MPHelper(threading.Thread):
    #
    def __init__(self, owner):
      self.owner = owner
      threading.Thread.__init__(self)
    def run(self):
      while not self.owner.abort: # 只要所有者线程没有结束,一直接受消息
        try:
          (bytes, addr) = self.owner.socket.recvfrom(2048) # 收取消息
          msg = pickle.loads(bytes) # 读取二进制数据转化为消息
          msg.source = addr[1]
          self.owner.queue.put(msg) # 队列存入消息
        except Exception as e:
          pass
 
  def __init__(self, owner, port, timeout=2):
    threading.Thread.__init__(self)
    self.owner = owner
    self.abort = False
    self.timeout = 2
    self.port = port
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP通信
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) # 通信参数
    self.socket.bind(("localhost", port)) # 通信地址,ip,端口
    self.socket.settimeout(timeout) # 超时设置
    self.queue = queue.Queue() # 队列
    self.helper = MessagePump.MPHelper(self) # 接收消息
 
  # 运行主线程
  def run(self):
    self.helper.start() # 开启收消息的线程
    while not self.abort:
      message = self.waitForMessage() # 阻塞等待
      self.owner.recvMessage(message) # 收取消息
 
  # 等待消息
  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 3) # 抓取数据,最多等待3s
      return msg
    except:
      return None
 
  # 发送消息
  def sendMessage(self, message):
    bytes = pickle.dumps(message) # 转化为二进制
    address = ("localhost", message.to) # 地址ip,端口(ip,port)
    self.socket.sendto(bytes, address)
    return True
  #是否停止收取消息
  def doAbort(self):
    self.abort = True

再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): # 类的继承
  # 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序
  def __init__(self, owner, port, timeout=2):
    MessagePump.__init__(self, owner, port, timeout) # 初始化父类
    self.messages = set() # 集合避免重复
 
  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 0.1) # 从队列抓取数据
      self.messages.add(msg) # 添加消息
    except Exception as e: # 处理异常
      pass
      # print(e)
    if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
      msg = random.choice(list(self.messages)) # 随机抓取消息发送
      self.messages.remove(msg) # 删除消息
    else:
      msg = None
    return msg

再来一个是记录类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
  def __init__(self):
    self.protocols = {}
    self.highestID = (-1, -1) # (port,count)
    self.value = None
 
  def addProtocol(self, protocol):
    self.protocols[protocol.proposalID] = protocol
    #
    if protocol.proposalID[1] > self.highestID[1] or (
        protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
      self.highestID = protocol.proposalID # 取得编号最大的协议
 
  def getProtocol(self, protocolID):
    return self.protocols[protocolID]
 
  def cleanProtocols(self):
    keys = self.protocols.keys()
    for k in keys:
      protocol = self.protocols[k]
      if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
        print("删除协议")
        del self.protocols[k]

下面就是Acceptor的实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 追随者
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxosAcceptorProtocol import PaxosAcceptorProtocol
class PaxosAcceptor:
  def __init__(self, port, leaders):
    self.port = port
    self.leaders = leaders
    self.instances = {} # 接口列表
    self.msgPump = MessagePump(self, self.port) # 消息传递器
    self.failed = False
 
  # 开始消息传送
  def start(self):
    self.msgPump.start()
 
  # 停止
  def stop(self):
    self.msgPump.doAbort()
 
  # 失败
  def fail(self):
    self.failed = True
 
  def recover(self):
    self.failed = False
 
  # 发送消息
  def sendMessage(self, message):
    self.msgPump.sendMessage(message)
 
  # 收消息,只收取为提议的消息
  def recvMessage(self, message):
    if message == None:
      return
    if self.failed: # 失败状态不收取消息
      return
 
    if message.command == Message.MSG_PROPOSE: # 判断消息是否为提议
      if message.instanceID not in self.instances:
        record = InstanceRecord() # 记录器
        self.instances[message.instanceID] = record
      protocol = PaxosAcceptorProtocol(self) # 创建协议
      protocol.recvProposal(message) # 收取消息
      self.instances[message.instanceID].addProtocol(protocol)
    else:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
 
  # 通知客户端,
  def notifyClient(self, protocol, message):
    if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # 提议被接受,通知
      self.instances[protocol.instanceID].value = message.value # 储存信息
      print(u"协议被客户端接受 %s" % message.value)
 
  # 获取最高同意的建议
  def getHighestAgreedProposal(self, instance):
    return self.instances[instance].highestID # (port,count)
 
  # 获取接口数据
  def getInstanceValue(self, instance):
    return self.instances[instance].value

那再看下AcceptorProtocol的实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from Message import Message
class PaxosAcceptorProtocol(object):
  # State variables
  STATE_UNDEFINED = -1 # 协议没有定义的情况0
  STATE_PROPOSAL_RECEIVED = 0 # 收到消息
  STATE_PROPOSAL_REJECTED = 1 # 拒绝链接
  STATE_PROPOSAL_AGREED = 2 # 同意链接
  STATE_PROPOSAL_ACCEPTED = 3 # 同意请求
  STATE_PROPOSAL_UNACCEPTED = 4 # 拒绝请求
 
  def __init__(self, client):
    self.client = client
    self.state = PaxosAcceptorProtocol.STATE_UNDEFINED
 
  # 收取,只处理协议类型的消息
  def recvProposal(self, message):
 
    if message.command == Message.MSG_PROPOSE: # 协议
      self.proposalID = message.proposalID
      self.instanceID = message.instanceID
      (port, count) = self.client.getHighestAgreedProposal(message.instanceID) # 端口,协议内容的最高编号
      # 检测编号处理消息协议
      # 判断协议是否最高
      if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # 协议同意
        print("同意协议:%s, %s " % (message.instanceID, message.value))
        value = self.client.getInstanceValue(message.instanceID)
        msg = Message(Message.MSG_ACCEPTOR_AGREE) # 同意协议
        msg.copyAsReply(message)
        msg.value = value
        msg.sequence = (port, count)
        self.client.sendMessage(msg) # 发送消息
      else: # 不再接受比最高协议小的提议
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
      return self.proposalID
    else:
      # 错误重试
      pass
  # 过度
  def doTransition(self, message): # 如果当前协议状态是接受连接,消息类型是接受
    if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
      self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # 接收协议
      msg = Message(Message.MSG_ACCEPTOR_ACCEPT) # 创造消息
      msg.copyAsReply(message) # 拷贝并回复
      for l in self.client.leaders:
        msg.to = l
        self.client.sendMessage(msg) # 给领导发送消息
      self.notifyClient(message) # 通知自己
      return True
    raise Exception("并非预期的状态和命令")
 
  # 通知 自己客户端
  def notifyClient(self, message):
    self.client.notifyClient(self, message)

接着看下Leader和LeaderProtocol实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# 领导者
import threading
import Queue
import time
from Message import Message
from MessagePump import MessagePump
from InstanceRecord import InstanceRecord
from PaxosLeaderProtocol import PaxosLeaderProtocol
class PaxosLeader:
  # 定时监听
  class HeartbeatListener(threading.Thread):
    def __init__(self, leader):
      self.leader = leader
      self.queue = Queue.Queue() # 消息队列
      self.abort = False
      threading.Thread.__init__(self)
 
    def newHB(self, message):
      self.queue.put(message)
 
    def doAbort(self):
      self.abort = True
 
    def run(self): # 读取消息
      elapsed = 0
      while not self.abort:
        s = time.time()
        try:
          hb = self.queue.get(True, 2)
          # 设定规则,谁的端口号比较高,谁就是领导
          if hb.source > self.leader.port:
            self.leader.setPrimary(False)
        except:
          self.leader.setPrimary(True)
 
  # 定时发送
  class HeartbeatSender(threading.Thread):
    def __init__(self, leader):
      threading.Thread.__init__(self)
      self.leader = leader
      self.abort = False
    def doAbort(self):
      self.abort = True
    def run(self):
      while not self.abort:
        time.sleep(1)
        if self.leader.isPrimary:
          msg = Message(Message.MSG_HEARTBEAT)
          msg.source = self.leader.port
          for leader in self.leader.leaders:
            msg.to = leader
            self.leader.sendMessage(msg)
 
  def __init__(self, port, leaders=None, acceptors=None):
    self.port = port
    if leaders == None:
      self.leaders = []
    else:
      self.leaders = leaders
    if acceptors == None:
      self.acceptors = []
    else:
      self.acceptors = acceptors
    self.group = self.leaders + self.acceptors # 集合合并
    self.isPrimary = False # 自身是不是领导
    self.proposalCount = 0
    self.msgPump = MessagePump(self, port) # 消息传送器
    self.instances = {}
    self.hbListener = PaxosLeader.HeartbeatListener(self) # 监听
    self.hbSender = PaxosLeader.HeartbeatSender(self) # 发送心跳
    self.highestInstance = -1 # 协议状态
    self.stoped = True # 是否正在运行
    self.lasttime = time.time() # 最后一次时间
 
  def sendMessage(self, message):
    self.msgPump.sendMessage(message)
 
  def start(self):
    self.hbSender.start()
    self.hbListener.start()
    self.msgPump.start()
    self.stoped = False
 
  def stop(self):
    self.hbSender.doAbort()
    self.hbListener.doAbort()
    self.msgPump.doAbort()
    self.stoped = True
 
  def setPrimary(self, primary): # 设置领导者
    if self.isPrimary != primary:
      # Only print if something's changed
      if primary:
        print(u"我是leader%s" % self.port)
      else:
        print(u"我不是leader%s" % self.port)
    self.isPrimary = primary
 
  # 获取所有的领导下面的追随者
  def getGroup(self):
    return self.group
 
  def getLeaders(self):
    return self.leaders
 
  def getAcceptors(self):
    return self.acceptors
 
  # 必须获得1/2以上的人支持
  def getQuorumSize(self):
    return (len(self.getAcceptors()) / 2) + 1
 
  def getInstanceValue(self, instanceID):
    if instanceID in self.instances:
      return self.instances[instanceID].value
    return None
 
  def getHistory(self): # 历史记录
    return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]
 
  # 抓取同意的数量
  def getNumAccpted(self):
    return len([v for v in self.getHistory() if v != None])
 
  # 抓取空白时间处理下事务
  def findAndFillGaps(self):
    for i in range(1, self.highestInstance):
      if self.getInstanceValue(i) == None:
        print("填充空白", i)
        self.newProposal(0, i)
    self.lasttime = time.time()
 
  # 采集无用信息
  def garbageCollect(self):
    for i in self.instances:
      self.instances[i].cleanProtocols()
 
  # 通知领导
  def recvMessage(self, message):
    if self.stoped:
      return
    if message == None:
      if self.isPrimary and time.time() - self.lasttime > 15.0:
        self.findAndFillGaps()
        self.garbageCollect()
      return
    #处理心跳信息
    if message.command == Message.MSG_HEARTBEAT:
      self.hbListener.newHB(message)
      return True
    #处理额外的提议
    if message.command == Message.MSG_EXT_PROPOSE:
      print("额外的协议", self.port, self.highestInstance)
      if self.isPrimary:
        self.newProposal(message.value)
      return True
 
    if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
 
    if message.command == Message.MSG_ACCEPTOR_ACCEPT:
      if message.instanceID not in self.instances:
        self.instances[message.instanceID] = InstanceRecord()
      record = self.instances[message.instanceID]
      if message.proposalID not in record.protocols:#创建协议
        protocol = PaxosLeaderProtocol(self)
        protocol.state = PaxosLeaderProtocol.STATE_AGREED
        protocol.proposalID = message.proposalID
        protocol.instanceID = message.instanceID
        protocol.value = message.value
        record.addProtocol(protocol)
      else:
        protocol = record.getProtocol(message.proposalID)
 
      protocol.doTransition(message)
 
    return True
  # 新建提议
  def newProposal(self, value, instance=None):
    protocol = PaxosLeaderProtocol(self)
    if instance == None: # 创建协议标号
      self.highestInstance += 1
      instanceID = self.highestInstance
    else:
      instanceID = instance
    self.proposalCount += 1
    id = (self.port, self.proposalCount)
    if instanceID in self.instances:
      record = self.instances[instanceID]
    else:
      record = InstanceRecord()
      self.instances[instanceID] = record
    protocol.propose(value, id, instanceID)
    record.addProtocol(protocol)
 
  def notifyLeader(self, protocol, message):
    if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
      print("协议接口%s被%s接受" % (message.instanceID, message.value))
      self.instances[message.instanceID].accepted = True
      self.instances[message.instanceID].value = message.value
      self.highestInstance = max(message.instanceID, self.highestInstance)
      return
    if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: # 重新尝试
      self.proposalCount = max(self.proposalCount, message.highestPID[1])
      self.newProposal(message.value)
      return True
    if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
      pass

LeaderProtocol实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from Message import Message
class PaxosLeaderProtocol(object):
  STATE_UNDEFINED = -1 # 协议没有定义的情况0
  STATE_PROPOSED = 0 # 协议消息
  STATE_REJECTED = 1 # 拒绝链接
  STATE_AGREED = 2 # 同意链接
  STATE_ACCEPTED = 3 # 同意请求
  STATE_UNACCEPTED = 4 # 拒绝请求
  def __init__(self, leader):
    self.leader = leader
    self.state = PaxosLeaderProtocol.STATE_UNDEFINED
    self.proposalID = (-1, -1)
    self.agreecount, self.acceptcount = (0, 0)
    self.rejectcount, self.unacceptcount = (0, 0)
    self.instanceID = -1
    self.highestseen = (0, 0)
  # 提议
  def propose(self, value, pID, instanceID):
    self.proposalID = pID
    self.value = value
    self.instanceID = instanceID
    message = Message(Message.MSG_PROPOSE)
    message.proposalID = pID
    message.instanceID = instanceID
    message.value = value
    for server in self.leader.getAcceptors():
      message.to = server
      self.leader.sendMessage(message)
    self.state = PaxosLeaderProtocol.STATE_PROPOSED
 
    return self.proposalID
 
  # 過度
  def doTransition(self, message):
    # 根據狀態運行協議
    if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
      if message.command == Message.MSG_ACCEPTOR_AGREE:
        self.agreecount += 1
        if self.agreecount >= self.leader.getQuorumSize(): # 选举
          print(u"达成协议的法定人数,最后的价值回答是:%s" % message.value)
          if message.value != None:
            if message.sequence[0] > self.highestseen[0] or (
                message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
              1]):
              self.value = message.value
              self.highestseen = message.sequence
 
            self.state = PaxosLeaderProtocol.STATE_AGREED # 同意更新
            # 发送同意消息
            msg = Message(Message.MSG_ACCEPT)
            msg.copyAsReply(message)
            msg.value = self.value
            msg.leaderID = msg.to
            for server in self.leader.getAcceptors():
              msg.to = server
              self.leader.sendMessage(msg)
            self.leader.notifyLeader(self, message)
          return True
 
        if message.command == Message.MSG_ACCEPTOR_REJECT:
          self.rejectcount += 1
          if self.rejectcount >= self.leader.getQuorumSize():
            self.state = PaxosLeaderProtocol.STATE_REJECTED
            self.leader.notifyLeader(self, message)
          return True
 
    if self.state == PaxosLeaderProtocol.STATE_AGREED:
      if message.command == Message.MSG_ACCEPTOR_ACCEPT: # 同意协议
        self.acceptcount += 1
        if self.acceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_ACCEPTED # 接受
          self.leader.notifyLeader(self, message)
      if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
        self.unacceptcount += 1
        if self.unacceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
          self.leader.notifyLeader(self, message)

测试模块:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import socket, pickle, time
from Message import Message
from PaxosAcceptor import PaxosAcceptor
from PaxosLeader import PaxosLeader
 
if __name__ == "__main__":
  # 设定5个客户端
  numclients = 5
  clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
  # 两个领导者
  leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])
  leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])
 
  # 开启领导者与追随者
  leader1.start()
  leader1.setPrimary(True)
  leader2.setPrimary(True)
  leader2.start()
  for c in clients:
    c.start()
 
  # 破坏,客户端不链接
  clients[0].fail()
  clients[1].fail()
 
  # 通信
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp协议
  start = time.time()
  for i in range(1000):
    m = Message(Message.MSG_EXT_PROPOSE) # 消息
    m.value = 0 + i # 消息参数
    m.to = 54322 # 设置传递的端口
    bytes = pickle.dumps(m) # 提取的二进制数据
    s.sendto(bytes, ("localhost", m.to)) # 发送消息
 
  while leader2.getNumAccpted() < 999:
    print("休眠的这一秒 %d " % leader2.getNumAccpted())
    time.sleep(1)
  print(u"休眠10秒")
  time.sleep(10)
  print(u"停止leaders")
  leader1.stop()
  leader2.stop()
  print(u"停止客户端")
  for c in clients:
    c.stop()
  print(u"leader1历史纪录")
  print(leader1.getHistory())
  print(u"leader2历史纪录")
  print(leader2.getHistory())
  end = time.time()
  print(u"一共用了%f秒" % (end - start))

代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/baidu_17508977/article/details/80741916