分布式一致性算法Paxos JAVA多线程方式实现

时间:2022-01-06 17:30:27
github地址:

https://github.com/hellolinjx/PaxosImpl/


//	准备提案过程,获得大多数决策者支持后进入确认提案阶段。
public synchronized boolean prepare(){
PrepareResult prepareResult = null;

boolean isContinue = true;
// 已获得承诺个数
int promisedCount = 0;

do{
List<Proposal> promisedProposals = new ArrayList<Proposal>();
List<Proposal> acceptedProposals = new ArrayList<Proposal>();
promisedCount = 0;
for (Acceptor acceptor : acceptors){

prepareResult = acceptor.onPrepare(proposal);
// 随机休眠一段时间,模拟网络延迟。
PaxosUtil.sleepRandom();

// 模拟网络异常
if (null==prepareResult){
continue;
}

// 获得承诺
if (prepareResult.isPromised()){
promisedCount ++;
}else{
// 决策者已经给了更高id题案的承诺
if (prepareResult.getAcceptorStatus()==AcceptorStatus.PROMISED){
promisedProposals.add(prepareResult.getProposal());
}
// 决策者已经通过了一个题案
if (prepareResult.getAcceptorStatus()==AcceptorStatus.ACCEPTED){
acceptedProposals.add(prepareResult.getProposal());
}
}
}// end of for

// 获得多数决策者的承诺
// 可以进行第二阶段:题案提交
if (promisedCount >= halfCount){
break;
}
Proposal votedProposal = votedEnd(acceptedProposals);
// 决策者已经半数通过题案
if (votedProposal !=null){
System.out.println("决策已经投票结束:" + votedProposal);
return true;
}


Proposal maxIdAcceptedProposal = getMaxIdProposal(acceptedProposals);
// 在已经被决策者通过题案中选择序列号最大的决策,作为自己的决策。
if (maxIdAcceptedProposal != null){
proposal.setId(PaxosUtil.generateId());
proposal.setValue(maxIdAcceptedProposal.getValue());
}else{
proposal.setId(PaxosUtil.generateId());
}


}while(isContinue);
return false;
}

// 获得大多数决策者承诺后,开始进行提案确认
public synchronized boolean commit(){
boolean isContinue = true;

// 已获得接受该提案的决策者个数
int acceptedCount = 0;
do{
List<Proposal> acceptedProposals = new ArrayList<Proposal>();
acceptedCount = 0;
for (Acceptor acceptor : acceptors){

CommitResult commitResult = acceptor.onCommit(proposal);
// 模拟网络延迟
PaxosUtil.sleepRandom();

// 模拟网络异常
if (null==commitResult){
continue;
}

// 题案被决策者接受。
if (commitResult.isAccepted()){
acceptedCount ++;
}else{
acceptedProposals.add(commitResult.getProposal());
}
}

// 题案被半数以上决策者接受,说明题案已经被选出来。
if (acceptedCount >= halfCount){
System.out.println("题案已经投票选出:" + proposal);
return true;
}else{
Proposal maxIdAcceptedProposal = getMaxIdProposal(acceptedProposals);
// 在已经被决策者通过题案中选择序列号最大的决策,重新生成递增id,改变自己的value为序列号最大的value。
// 这是一种预测,预测此maxIdAccecptedProposal最有可能被超过半数的决策者接受。
if (maxIdAcceptedProposal != null){
proposal.setId(PaxosUtil.generateId());
proposal.setValue(maxIdAcceptedProposal.getValue());
}else{
proposal.setId(PaxosUtil.generateId());
}

// 回退到决策准备阶段
if (prepare())
return true;
}

}while(isContinue);

return true;
}

//	加锁此准备函数,不允许同时访问。模拟单个决策者串行处理一个请求。	public synchronized PrepareResult onPrepare(Proposal szProposal){		PrepareResult prepareResult = new PrepareResult();				//	模拟网络不正常,发生丢包、超时现象		if	(PaxosUtil.isCrashed()){			PaxosUtil.printStr("Network not normal: " + this.toString());			return null;		}				switch (status){			//	NONE表示之前没有承诺过任何提议者			//	此时,接受提案			case NONE:				prepareResult.setAcceptorStatus(AcceptorStatus.NONE);				prepareResult.setPromised(true);				prepareResult.setProposal(null);				//	转换自身的状态,已经承诺了提议者,并记录承诺的提案。				status = AcceptorStatus.PROMISED;				promisedProposal.copyFromInstance(szProposal);				return prepareResult;			//	已经承诺过任意提议者			case PROMISED:				//	判断提案的先后顺序,只承诺相对较新的提案				if	(promisedProposal.getId() > szProposal.getId()){					prepareResult.setAcceptorStatus(status);					prepareResult.setPromised(false);					prepareResult.setProposal(promisedProposal);					return prepareResult;				}else{					promisedProposal.copyFromInstance(szProposal);					prepareResult.setAcceptorStatus(status);					prepareResult.setPromised(true);					prepareResult.setProposal(promisedProposal);					return prepareResult;				}			//	已经批准过提案			case ACCEPTED:				//	如果是同一个提案,只是序列号增大				//	批准提案,更新序列号。				if	(promisedProposal.getId()<szProposal.getId()						&& promisedProposal.getValue().equals(szProposal.getValue())){					promisedProposal.setId(szProposal.getId());					prepareResult.setAcceptorStatus(status);					prepareResult.setPromised(true);					prepareResult.setProposal(promisedProposal);					return prepareResult;				}else{ 	//	否则,不予批准					prepareResult.setAcceptorStatus(status);					prepareResult.setPromised(false);					prepareResult.setProposal(acceptedProposal);					return prepareResult;				}			default:				//return null;		}				return null;	}		//	加锁此提交函数,不允许同时访问,模拟单个决策者串行决策	public synchronized CommitResult onCommit(Proposal szProposal){		CommitResult commitResult = new CommitResult();		if	(PaxosUtil.isCrashed()){			return null;		}		switch (status){			//	不可能存在此状态			case NONE:	return null;			//	已经承诺过提案			case PROMISED:				//	判断commit提案和承诺提案的序列号大小				//	大于,接受提案。				if	(szProposal.getId() >= promisedProposal.getId()){					promisedProposal.copyFromInstance(szProposal);					acceptedProposal.copyFromInstance(szProposal);					status = AcceptorStatus.ACCEPTED;					commitResult.setAccepted(true);					commitResult.setAcceptorStatus(status);					commitResult.setProposal(promisedProposal);					return commitResult;									}else{	//	小于,回绝提案					commitResult.setAccepted(false);					commitResult.setAcceptorStatus(status);					commitResult.setProposal(promisedProposal);					return commitResult;				}			//	已接受过提案			case ACCEPTED:				//	同一提案,序列号较大,接受				if	(szProposal.getId() > acceptedProposal.getId()						&& szProposal.getValue().equals(acceptedProposal.getValue())){					acceptedProposal.setId(szProposal.getId());					commitResult.setAccepted(true);					commitResult.setAcceptorStatus(status);					commitResult.setProposal(acceptedProposal);					return commitResult;				}else{	//	否则,回绝提案					commitResult.setAccepted(false);					commitResult.setAcceptorStatus(status);					commitResult.setProposal(acceptedProposal);					return commitResult;				}		}						return null;	}