分布式协议学习笔记(一) Raft 选举

时间:2023-05-20 15:53:20

Raft官网

论文中文翻译

论文英文地址

感觉作为paxos的升级精简版 Raft在设计之初就以容易理解为目标 看完资料 脑海里都有了大概的轮廓。

有了这些详细的资料甚至是动画演示在前 起始都没多少好说的,本篇知识作为记录下学习点,作为日后回顾提示

在分布式系统中,一致性指的是集群中的多个节点在状态上达成一致.但是在现实场景中,由于程序崩溃、网络故障、硬件故障、断电等原因,节点间的一致性很难保证,这样就需要Paxos、Raft等一致性协议。

Paxos协议是Leslie Lamport在1990年提出的一种基于消息传递的、具有高度容错特性的一致性算法.但是Paxos有两个明显的缺点:第一个缺点就是Paxos算法难以理解.第二个缺点就是并没有提供构建现实系统的良好基础,

有很多工程化Paxos算法的尝试,但是他们对Paxos算法本身做了较大改动,彼此之间的实现差距都比较大

Raft算法是一种用于管理复制日志的一致性算法,在设计Raft算法时设计者就将易于理解作为目标之一,是的Raft算法更易于构建实际的系统,大幅度减少了工程化的工作量。

1 Leader选举

Raft协议的模式是一个Leader节点和多个Follower节点的模式。就是常说的Leader-Follower模式.每个节点有三个状态Leader Follower Candidate状态

分布式协议学习笔记(一) Raft 选举

Leader负责处理客户端请求 并且将处理结果以log形式同步到其他Follower节点上

分布式协议学习笔记(一) Raft 选举

在Raft协议中有两个时间控制Leader选举的进度。

一个Leader定时向Follower发送心跳包。

一个是选举超时控制(election timeout),选举超时控制就是一个处于Follower节点等待进入Candidate状态的时间限制。

选举超时控制(election timeout)一般在选择150ms到300ms之间的随机值(概率上避免多个节点同时进入Candidate状态)

若某个节点election timeout进度完成之前都没收到Leader的心跳包,则说明没有Leader,该节点进入Candidate状态.给自己投票,然后给其他节点发送选举请求.

其他节点收到选举请求后,若在当前请求中标记的任期(term)内比自己记录的term相等或者更大,且未进行过投票,则回复答应该投票请求,重置自己的选举超时控制

分布式协议学习笔记(一) Raft 选举分布式协议学习笔记(一) Raft 选举分布式协议学习笔记(一) Raft 选举分布式协议学习笔记(一) Raft 选举

选举者获取一半以上投票,进入Leader状态,开始给其他节点Follower发送心跳,维持自己的权威

分布式协议学习笔记(一) Raft 选举分布式协议学习笔记(一) Raft 选举

下面来看看多个节点 选择的情况 节点B D同时发起选举投票,并且每个节点都获取一张选票,最后的结果就是随机选举超时时间,选举超时控制(election timeout)一般在选择150ms到300ms之间的随机值(概率上避免多个节点同时进入Candidate状态) 。

分布式协议学习笔记(一) Raft 选举分布式协议学习笔记(一) Raft 选举分布式协议学习笔记(一) Raft 选举

最终,重复多次选举投票后(概率很小),某个节点获取一半以上投票,成为Leader。

分布式协议学习笔记(一) Raft 选举

 #pragma once
#include <iostream>
#include <fstream>
#include <cassert>
#include <string>
#include <iostream>
#include <vector>
#include <map>
using namespace std;
/*
*作 者: itdef
*欢迎转帖 请保持文本完整并注明出处
*技术博客 http://www.cnblogs.com/itdef/
*技术交流群 群号码:432336863
*欢迎c c++ windows驱动爱好者 服务器程序员沟通交流
*部分老代码存放地点
*http://www.oschina.net/code/list_by_user?id=614253
*/
const string FILE_NAME = "config.txt";
class ReadConfig {
public:
ReadConfig(string filename = "") {
if (filename.empty()) {
file_name = FILE_NAME;
}
else {
file_name = filename;
}
}
~ReadConfig() {}
map<string, string> Do() {
tar_path.clear();
ifstream fin;
fin.open(file_name);
if (false == fin.is_open()) {
std::cerr << "open file failed!!" << std::endl;
return tar_path;
}
string s;
while (getline(fin, s))
{
if ('#' == s[] || ('/' == s[] && '/' == s[]))
continue;
size_t pos = s.find_first_of("=");
if (pos == std::string::npos || pos + >= s.size())
continue;
string targetName = s.substr(, pos);
string path = s.substr(pos + );
std::cout << targetName << " = " << path << std::endl;
if (path[] != ' ')
tar_path[targetName] = path;
}
fin.close();
return tar_path;
}
private:
map<string, string> tar_path;
string file_name;
};

ReadConfig.h

 #pragma once
#pragma once
#include <string>
#include <mutex>
#include <map> const enum STATUS {
LEADER_STATUS = ,
FOLLOWER_STATUS,
CANDIDATE_STATUS,
PRE_VOTE_STAUS,
}; const enum INFOTYPE {
DEFAULT_TYPE = ,
HEART_BREAT_TYPE,
VOTE_LEADER_TYPE,
VOTE_LEADER_RESP_TYPE, }; typedef struct netInfo {
int fromID;
int toID;
INFOTYPE infotype;
int term;
int voteId; //选举ID infotype为votetype才有效
}NetInfo; typedef struct locaInfo {
int id;
int leaderID;
STATUS status;
int term;
int isVote;
int IsRecvHeartbeat;
std::map<int, int> voteRecord;// id term有此记录表示该term收到该id投取自己一票
}LocalInfo; typedef struct localInfoWithLock {
LocalInfo locInfo;
std::mutex m;
}LocalInfoWithLock;

CommonStruct.h

 #pragma once
#pragma once
#include "CommonStruct.h"
#include "ReadConfig.h"
#include <memory>
#include <boost/asio.hpp> using boost::asio::ip::tcp;
using namespace std; class RaftManager :public enable_shared_from_this<RaftManager> {
public:
static std::shared_ptr<RaftManager> GetInstance() {
if (p == nullptr)
p.reset(new RaftManager());
//p = std::make_shared<RaftManager>();
return p;
}
~RaftManager() {
std::cout << "enter ~RaftManager()\n";
}
bool Init();
bool Go(); private:
boost::asio::io_service io_service;
std::string ip; int portStart;
int nodeID;
int electionTimeout;
int heartbeatTime;
LocalInfoWithLock locInfolock; //===============================send
void DiapatchByStatus(int id, int& timeoutLimit);
void HandleLeaderSend(int id, int& timeoutLimit);
void HandleCandidateSend(int id, int& timeoutLimit);
void HandleFollowerSend(int id, int& timeoutLimit);
void HandlePreVoteSend(int id, int& timeoutLimit); //===================recv
void DiapatchByInfoType(const NetInfo& netinf);
void HandleHeartbeatTypeRecv(const NetInfo& netinf);
void HandleVoteTypeRecv(const NetInfo& netinf);
void HandleVoteRespTypeRecv(const NetInfo& netinf); std::function<int()> dice; bool LoopCheck(int id, std::shared_ptr<tcp::socket> s);
void Session(tcp::socket sock);
void SendFunc(int id); RaftManager() {}
RaftManager(const RaftManager&) = delete;
RaftManager& operator=(const RaftManager&) = delete;
static std::shared_ptr<RaftManager> p;
};

RaftManager.h

 #include "RaftManager.h"
#include <random>
#include <functional> std::shared_ptr<RaftManager> RaftManager::p = nullptr; bool RaftManager::Init() {
//可以使用json 读取配置
ReadConfig cfg("nodeCfg");
map<string, string> kv = cfg.Do(); if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
assert();
return false;
}
ip = kv["ip"]; portStart = stoi(kv["portStart"]); nodeID = stoi(kv["nodeID"]);
electionTimeout = ;
heartbeatTime = ;
if (kv.find("heartbeatTime") != kv.end())
heartbeatTime = stoi(kv["heartbeatTime"]); locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = ;
locInfolock.locInfo.IsRecvHeartbeat = ; locInfolock.locInfo.isVote = ;
locInfolock.locInfo.status = FOLLOWER_STATUS;
locInfolock.locInfo.voteRecord.clear(); std::random_device rd;
std::default_random_engine engine(rd());
std::uniform_int_distribution<> dis(, );
dice = std::bind(dis, engine); return true;
} void RaftManager::HandleLeaderSend(int id, int& timeoutLimit) {
if (timeoutLimit > ){
timeoutLimit -= ;
}
if (timeoutLimit <= ) { timeoutLimit = dice();
}
}
void RaftManager::HandleCandidateSend(int id, int& timeoutLimit) {
if (timeoutLimit > ) {
timeoutLimit -= ;
}
if (timeoutLimit <= ) { timeoutLimit = dice();
} } void RaftManager::HandlePreVoteSend(int id, int& timeoutLimit) {
if (timeoutLimit > ) {
timeoutLimit -= ;
}
if (timeoutLimit <= ) { timeoutLimit = dice();
} } void RaftManager::HandleFollowerSend(int id, int& timeoutLimit) {
if (timeoutLimit > ) {
timeoutLimit -= ;
}
if (timeoutLimit <= ) {
LocalInfo localInfo;
//加锁获取当前状态 决定是否进行发送操作
{
//加锁获取本地当前状态
std::lock_guard<std::mutex> lck(locInfolock.m);
localInfo = locInfolock.locInfo;
}
if (localInfo.IsRecvHeartbeat == ) {
//心跳超时 切换到选举模式
std::lock_guard<std::mutex> lck(locInfolock.m);
locInfolock.locInfo.term++;
locInfolock.locInfo.status = CANDIDATE_STATUS;
locInfolock.locInfo.voteRecord.clear();
locInfolock.locInfo.voteRecord[nodeID] = locInfolock.locInfo.term;
} timeoutLimit = dice();
}
} //===================
void RaftManager::HandleHeartbeatTypeRecv(const NetInfo& netinf) {
std::lock_guard<std::mutex> lck(locInfolock.m);
if (netinf.fromID != locInfolock.locInfo.leaderID)
locInfolock.locInfo.leaderID = netinf.fromID;
locInfolock.locInfo.IsRecvHeartbeat = ; }
void RaftManager::HandleVoteTypeRecv(const NetInfo& netinf) {
std::lock_guard<std::mutex> lck(locInfolock.m);
int voteid = netinf.fromID;
if (locInfolock.locInfo.isVote == ) {
//回复投票 todo locInfolock.locInfo.isVote = ; //标记该term已经投票
}
else {
//回复不投票 todo
} }
void RaftManager::HandleVoteRespTypeRecv(const NetInfo& netinf) {
std::lock_guard<std::mutex> lck(locInfolock.m);
if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == nodeID) {
//更新本地map记录
locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
}
int count = ;
std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
//查看本term的投票是否达半数以上
while (it != locInfolock.locInfo.voteRecord.end()) {
if (it->second == locInfolock.locInfo.term)
count++;
it++;
}
if (count > / ) {
//达到半数以上 转化为leader模式 否则继续选举
locInfolock.locInfo.leaderID = nodeID;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.status = LEADER_STATUS;
}
} //loop send
void RaftManager::DiapatchByStatus(int id,int& timeoutLimit) {
NetInfo netinf{ nodeID,id,DEFAULT_TYPE,, };
LocalInfo localInfo;
//加锁获取当前状态 决定是否进行发送操作
{
//加锁获取本地当前状态
std::lock_guard<std::mutex> lck(locInfolock.m);
localInfo = locInfolock.locInfo;
}
switch (localInfo.status) {
case LEADER_STATUS:
HandleLeaderSend(id,timeoutLimit);
break;
case FOLLOWER_STATUS:
HandleFollowerSend(id,timeoutLimit);
break;
case CANDIDATE_STATUS:
HandleCandidateSend(id,timeoutLimit);
break;
case PRE_VOTE_STAUS:
HandlePreVoteSend(id, timeoutLimit);
default:
std::cerr << "unknown status!!" << std::endl;
} } //handle recv
void RaftManager::DiapatchByInfoType(const NetInfo& netinf) {
{
std::lock_guard<std::mutex> lck(locInfolock.m);
if (netinf.term < locInfolock.locInfo.term)
return;
if (netinf.term > locInfolock.locInfo.term) {
locInfolock.locInfo.term = netinf.term;
locInfolock.locInfo.status = FOLLOWER_STATUS;
locInfolock.locInfo.isVote = ;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.voteRecord.clear();
}
}
//========================================
switch (netinf.infotype) {
case HEART_BREAT_TYPE:
HandleHeartbeatTypeRecv(netinf);
break;
case VOTE_LEADER_TYPE:
HandleVoteTypeRecv(netinf);
break;
case VOTE_LEADER_RESP_TYPE:
HandleVoteRespTypeRecv(netinf);
break;
default:
std::cerr << "Recv Unknown info type." << std::endl;
} } bool RaftManager::LoopCheck(int id, std::shared_ptr<tcp::socket> s) {
int looptime = ;
int timeoutlimit = dice();
while () {
DiapatchByStatus(id, timeoutlimit);
std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
} return false;
} void RaftManager::SendFunc(int i) {
//todo
//示例 间隔200ms扫描 心跳间隔5000ms 选举超时未 1001-4000ms
string port = "";
port[port.size() - ] += i;
int looptime = ;
while () {
std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
tcp::resolver resolver(io_service);
try {
boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", port }));
}
catch (exception& e) {
//持续尝试连接
continue;
}
LoopCheck(i, s);
std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
} return;
} void RaftManager::Session(tcp::socket sock) {
BYTE data[] = { };
boost::system::error_code error;
NetInfo netinf;
while () {
size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
if (error == boost::asio::error::eof)
return; // Connection closed cleanly by peer.
else if (error) {
std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
return;
}
if (length != sizeof(netinf)) {
std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
return;
} DiapatchByInfoType(netinf); }
} bool RaftManager::Go() {
//建立网络 本来可以使用广播 获取和通知其他节点
//演示版本假定 5个ID和端口分别为1 2 3 4 5 和9921 9922 9923 9924 9925
if (ip == "" || portStart == || nodeID == )
return false;
try {
//开启4个与其他线程发送信息的线程
for (int i = ; i <= ; i++) {
if (i == nodeID)
continue;
std::thread t = std::thread(&RaftManager::SendFunc, shared_from_this(), i);
t.detach();
} int port = portStart + nodeID;
tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
for (;;)
{
for (;;)
{
tcp::socket sock(io_service);
a.accept(sock);
std::thread(&RaftManager::Session, shared_from_this(), std::move(sock)).detach();
}
}
}
catch (exception& e) {
std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
return false;
} return true;
}

RaftManager.cpp

 // QueueTemplate.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
// #include "pch.h"
#include <iostream> #include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
using namespace std; template<typename T>
class SyncQueue
{
public:
SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
{
} void Put(const T&x)
{
Add(x);
} void Put(T&&x)
{
Add(std::forward<T>(x));
} void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop)
return;
list = std::move(m_queue);
m_notFull.notify_one();
} void Take(T& t)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
} void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
} bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
} bool Full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
} size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
} int Count()
{
return m_queue.size();
}
private:
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if (full)
cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
return !full;
} bool NotEmpty() const
{
bool empty = m_queue.empty();
if (empty)
cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
return !empty;
} template<typename F>
void Add(F&&x)
{
std::unique_lock< std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
if (m_needStop)
return; m_queue.push_back(std::forward<F>(x));
m_notEmpty.notify_one();
} private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合起来使用
std::condition_variable m_notEmpty;//不为空的条件变量
std::condition_variable m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size bool m_needStop; //停止的标志
}; int main()
{
std::cout << "Hello World!\n"; SyncQueue<int> q();
q.Put(); int a = ;
q.Take(a); q.Put();
q.Take(a); q.Stop(); }

syncqueue.h

自己尝试做一个简化的raft选举演示

实现定义2-5个节点,使用读取配置文件来获取IP和端口以及节点ID

网络使用boost同步流程

一个线程收 四个线程发送

1 收的线程根据接受的数据 判断是心跳包还是选举请求还是选举请求回复  来更新自己的时间逻辑编号term 更新是否投票isVote 和最新term中那些节点投了自己的选举票map<int,int> // nodeid, term

2 发送的节点每个200MS则轮询一次,根据结点当前状态减少等待时间(等待时间根据节点状态调节为1000ms心跳间隔或者1500-5000的随机选举超时)

根据当前状态决定发送心跳包或者是选举消息 或者是选举回复消息

待填坑

参考:

《etcd技术内幕》

http://thesecretlivesofdata.com/raft/#intro