https://github.com/hellolinjx/PaxosImpl/
// 准备提案过程,获得大多数决策者支持后进入确认提案阶段。
public synchronized boolean prepare(){
PrepareResult prepareResult = null;
boolean isContinue = true;
// 已获得承诺个数
int promisedCount = 0;
do{
List<Proposal> promisedProposals = new ArrayList<Proposal>();
List<Proposal> acceptedProposals = new ArrayList<Proposal>();
promisedCount = 0;
for (Acceptor acceptor : acceptors){
prepareResult = acceptor.onPrepare(proposal);
// 随机休眠一段时间,模拟网络延迟。
PaxosUtil.sleepRandom();
// 模拟网络异常
if (null==prepareResult){
continue;
}
// 获得承诺
if (prepareResult.isPromised()){
promisedCount ++;
}else{
// 决策者已经给了更高id题案的承诺
if (prepareResult.getAcceptorStatus()==AcceptorStatus.PROMISED){
promisedProposals.add(prepareResult.getProposal());
}
// 决策者已经通过了一个题案
if (prepareResult.getAcceptorStatus()==AcceptorStatus.ACCEPTED){
acceptedProposals.add(prepareResult.getProposal());
}
}
}// end of for
// 获得多数决策者的承诺
// 可以进行第二阶段:题案提交
if (promisedCount >= halfCount){
break;
}
Proposal votedProposal = votedEnd(acceptedProposals);
// 决策者已经半数通过题案
if (votedProposal !=null){
System.out.println("决策已经投票结束:" + votedProposal);
return true;
}
Proposal maxIdAcceptedProposal = getMaxIdProposal(acceptedProposals);
// 在已经被决策者通过题案中选择序列号最大的决策,作为自己的决策。
if (maxIdAcceptedProposal != null){
proposal.setId(PaxosUtil.generateId());
proposal.setValue(maxIdAcceptedProposal.getValue());
}else{
proposal.setId(PaxosUtil.generateId());
}
}while(isContinue);
return false;
}
// 获得大多数决策者承诺后,开始进行提案确认
public synchronized boolean commit(){
boolean isContinue = true;
// 已获得接受该提案的决策者个数
int acceptedCount = 0;
do{
List<Proposal> acceptedProposals = new ArrayList<Proposal>();
acceptedCount = 0;
for (Acceptor acceptor : acceptors){
CommitResult commitResult = acceptor.onCommit(proposal);
// 模拟网络延迟
PaxosUtil.sleepRandom();
// 模拟网络异常
if (null==commitResult){
continue;
}
// 题案被决策者接受。
if (commitResult.isAccepted()){
acceptedCount ++;
}else{
acceptedProposals.add(commitResult.getProposal());
}
}
// 题案被半数以上决策者接受,说明题案已经被选出来。
if (acceptedCount >= halfCount){
System.out.println("题案已经投票选出:" + proposal);
return true;
}else{
Proposal maxIdAcceptedProposal = getMaxIdProposal(acceptedProposals);
// 在已经被决策者通过题案中选择序列号最大的决策,重新生成递增id,改变自己的value为序列号最大的value。
// 这是一种预测,预测此maxIdAccecptedProposal最有可能被超过半数的决策者接受。
if (maxIdAcceptedProposal != null){
proposal.setId(PaxosUtil.generateId());
proposal.setValue(maxIdAcceptedProposal.getValue());
}else{
proposal.setId(PaxosUtil.generateId());
}
// 回退到决策准备阶段
if (prepare())
return true;
}
}while(isContinue);
return true;
}
// 加锁此准备函数,不允许同时访问。模拟单个决策者串行处理一个请求。 public synchronized PrepareResult onPrepare(Proposal szProposal){ PrepareResult prepareResult = new PrepareResult(); // 模拟网络不正常,发生丢包、超时现象 if (PaxosUtil.isCrashed()){ PaxosUtil.printStr("Network not normal: " + this.toString()); return null; } switch (status){ // NONE表示之前没有承诺过任何提议者 // 此时,接受提案 case NONE: prepareResult.setAcceptorStatus(AcceptorStatus.NONE); prepareResult.setPromised(true); prepareResult.setProposal(null); // 转换自身的状态,已经承诺了提议者,并记录承诺的提案。 status = AcceptorStatus.PROMISED; promisedProposal.copyFromInstance(szProposal); return prepareResult; // 已经承诺过任意提议者 case PROMISED: // 判断提案的先后顺序,只承诺相对较新的提案 if (promisedProposal.getId() > szProposal.getId()){ prepareResult.setAcceptorStatus(status); prepareResult.setPromised(false); prepareResult.setProposal(promisedProposal); return prepareResult; }else{ promisedProposal.copyFromInstance(szProposal); prepareResult.setAcceptorStatus(status); prepareResult.setPromised(true); prepareResult.setProposal(promisedProposal); return prepareResult; } // 已经批准过提案 case ACCEPTED: // 如果是同一个提案,只是序列号增大 // 批准提案,更新序列号。 if (promisedProposal.getId()<szProposal.getId() && promisedProposal.getValue().equals(szProposal.getValue())){ promisedProposal.setId(szProposal.getId()); prepareResult.setAcceptorStatus(status); prepareResult.setPromised(true); prepareResult.setProposal(promisedProposal); return prepareResult; }else{ // 否则,不予批准 prepareResult.setAcceptorStatus(status); prepareResult.setPromised(false); prepareResult.setProposal(acceptedProposal); return prepareResult; } default: //return null; } return null; } // 加锁此提交函数,不允许同时访问,模拟单个决策者串行决策 public synchronized CommitResult onCommit(Proposal szProposal){ CommitResult commitResult = new CommitResult(); if (PaxosUtil.isCrashed()){ return null; } switch (status){ // 不可能存在此状态 case NONE: return null; // 已经承诺过提案 case PROMISED: // 判断commit提案和承诺提案的序列号大小 // 大于,接受提案。 if (szProposal.getId() >= promisedProposal.getId()){ promisedProposal.copyFromInstance(szProposal); acceptedProposal.copyFromInstance(szProposal); status = AcceptorStatus.ACCEPTED; commitResult.setAccepted(true); commitResult.setAcceptorStatus(status); commitResult.setProposal(promisedProposal); return commitResult; }else{ // 小于,回绝提案 commitResult.setAccepted(false); commitResult.setAcceptorStatus(status); commitResult.setProposal(promisedProposal); return commitResult; } // 已接受过提案 case ACCEPTED: // 同一提案,序列号较大,接受 if (szProposal.getId() > acceptedProposal.getId() && szProposal.getValue().equals(acceptedProposal.getValue())){ acceptedProposal.setId(szProposal.getId()); commitResult.setAccepted(true); commitResult.setAcceptorStatus(status); commitResult.setProposal(acceptedProposal); return commitResult; }else{ // 否则,回绝提案 commitResult.setAccepted(false); commitResult.setAcceptorStatus(status); commitResult.setProposal(acceptedProposal); return commitResult; } } return null; }