美文网首页
paxos算法java代码实现

paxos算法java代码实现

作者: YunaY | 来源:发表于2017-02-14 16:48 被阅读0次

    paxos算法以其难以理解而著称,主要体现在两个方面:

    1、Lamport最初的论文以叙事的方式叙述了算法核心

    2、算法即使有非常严谨的推导过程也很难被确定为严格正确的(但目前事实证明确实是有效的)

    目前在网络上很多类似的推导过程,自己看着也很困惑,所以尝试使用代码实现了这个过程,记录如下:

    
    importjava.util.ArrayList;
    
    importjava.util.List;
    
    importjava.util.Random;
    
    /**
    
    * Phase 1
    
    * (a) A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors.
    
    * (b) If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered pro-posal (if any) that it has accepted.
    
    * Phase 2
    
    * (a) If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v , where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals.
    
    * (b) If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.
    
    */
    
    public classPaxos {
    
    public static voidmain(String[] args) {
    
    ComputerManager computerManager =newPaxos().newComputerManager();
    
    try{
    
    computerManager.start(7);
    
    }catch(Exception e) {
    
    e.printStackTrace();
    
    }
    
    }
    
    classComputerManager {
    
    privateListcomputers=newArrayList();//定义一个集合
    
    privateIntegerstartSize;
    
    /**
    
    *启动所有服务器
    
    *
    
    *@throwsException
    
    */
    
    public voidstart(Integer startSize)throwsException {
    
    if(computers!=null&&computers.size() >0)
    
    throw newException("restart error");
    
    this.startSize= startSize;
    
    Paxos paxos =newPaxos();
    
    for(inti =0;i < startSize;i++) {
    
    Computer computer = paxos.newComputer(this);
    
    Thread thread =newThread(computer);
    
    thread.start();
    
    }
    
    }
    
    /**
    
    *启动完成的服务器注册
    
    *
    
    *@return
    
    */
    
    public voidregister(Computer computer) {
    
    computers.add(computer);
    
    }
    
    /**
    
    *获取所有服务器
    
    *
    
    *@return
    
    */
    
    publicIntegergetHelfSize() {
    
    returncomputers.size() /2+1;
    
    //            return startSize / 2 + 1;
    
    }
    
    /**
    
    *获取一个法定集合
    
    *
    
    *@return
    
    */
    
    public synchronizedListgetLegalComputers() {
    
    List list =newArrayList();
    
    intcount =0;
    
    intcomputerSize =computers.size();
    
    inthelfCount = computerSize /2+1;
    
    Random random =newRandom();
    
    while(count < helfCount) {
    
    //生成一个随机数
    
    int_random = Math.abs(random.nextInt(computerSize));
    
    if(_random >=0&& _random < computerSize) {
    
    Computer _computer =computers.get(_random);
    
    if(!list.contains(_computer)) {
    
    list.add(_computer);
    
    count++;
    
    }
    
    }
    
    }
    
    returnlist;
    
    }
    
    }
    
    classComputerimplementsRunnable {
    
    privateIntegerid= Math.abs(newRandom().nextInt());//服务器ID
    
    privateIntegermaxN;//当前接收到的提案号
    
    privateIntegeracceptN;//已经同意的提案号
    
    privateIntegeracceptV;//已经同意的提案号对应的值
    
    privateComputerManagercomputerManager;
    
    Computer(ComputerManager computerManager) {
    
    this.computerManager= computerManager;
    
    }
    
    public synchronizedObject[]prepaer(Integer acceptN) {
    
    System.out.println("---------------------------------------------------分割线------------------------------");
    
    System.out.println(acceptN +"申请提案:"+this.id+".........."+this.maxN+"........"+this.acceptN+"......"+this.acceptV);
    
    /*这里模拟一个断网情况,如果随机为2则断网*/
    
    Random random =newRandom();
    
    intstate = random.nextInt(10);
    
    if(state ==2)
    
    return null;
    
    /*以下为正常情况*/
    
    //如果之前没有接受过提案,直接返回null
    
    if(maxN==null) {
    
    this.maxN= acceptN;//令当前接收到的提案号=当前申请的提案号
    
    return newObject[]{"pok", null, null};
    
    }
    
    if(maxN> acceptN) {
    
    //由于当前申请提案号小于已经同意的提案号,所以不接收提案申请
    
    return newObject[]{"error", null, null};
    
    }
    
    if(acceptN >maxN) {//判断新申请的提案是否为新提案
    
    this.maxN= acceptN;//令当前接收到的提案号=当前申请的提案号
    
    if(this.acceptN==null) {//如果之前没有通过任何提案,返回null
    
    return newObject[]{"pok", null, null};
    
    }else{
    
    //如果之前同意过提案,返回最后同意的提案编号和提案值
    
    return newObject[]{"pok", this.acceptN, this.acceptV};
    
    }
    
    }
    
    return null;
    
    }
    
    public synchronizedStringaccept(Integer acceptN,Integer acceptV) {
    
    //首先当前申请的提案号acceptN不能小于maxN
    
    if(maxN<= acceptN) {
    
    maxN= acceptN;
    
    this.acceptN= acceptN;
    
    this.acceptV= acceptV;
    
    return"aok";
    
    }
    
    return"error";
    
    }
    
    /**
    
    *进行选举
    
    */
    
    public voidpaxos(Computer computer) {
    
    //获取一个法定集合
    
    List computers =computerManager.getLegalComputers();
    
    Integer _acceptN =0;
    
    Integer _acceptV =0;
    
    intcount =0;
    
    Integer cid = CId.getCid();
    
    for(Computer _computer : computers) {
    
    Object[] prepaer = _computer.prepaer(cid);//申请提交提案
    
    if(prepaer ==null)
    
    continue;
    
    System.out.println(cid +"("+ _acceptN +":"+ _acceptV +")"+"返回提案:"+ _computer.id+".........."+ prepaer[0] +"........"+ prepaer[1] +"......"+ prepaer[2]);
    
    String state = (String) prepaer[0];
    
    if("pok".equals(state))//接收到申请的情况
    
    {
    
    count++;
    
    if(_acceptN ==0&& prepaer[1] ==null) {
    
    //生成一个新的acceptV
    
    _acceptV = computer.id;
    
    }else{
    
    Integer acceptN = (Integer) prepaer[1];
    
    Integer acceptV = (Integer) prepaer[2];
    
    //使用返回的acceptV
    
    if(acceptN >= _acceptN) {
    
    _acceptN = acceptN;
    
    _acceptV = acceptV;
    
    }
    
    }
    
    }
    
    }
    
    //如果接收到的回复超过了半数,则正式提交提案
    
    if(count >=computerManager.getHelfSize()) {
    
    _acceptN = cid;
    
    //获取一个法定集合
    
    List computers1 =computerManager.getLegalComputers();
    
    intacount =0;
    
    for(Computer _computer : computers1) {
    
    System.out.println(_acceptN +"("+ _acceptV +")"+"提交提案:"+ _computer.id+".........."+ _computer.maxN+"........"+ _computer.acceptN+"......"+ _computer.acceptV);
    
    String accept = _computer.accept(_acceptN,_acceptV);//申请提交提案
    
    if("aok".equals(accept)) {
    
    acount++;
    
    }
    
    }
    
    if(acount >=computerManager.getHelfSize()) {
    
    System.out.println("提案被多数通过:"+ _acceptN +"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+ _acceptV);
    
    for(Computer _computer : computers1) {
    
    System.out.println(_computer.id+".........."+ _computer.maxN+"........"+ _computer.acceptN+"......"+ _computer.acceptV);
    
    }
    
    }
    
    }
    
    }
    
    /**
    
    *启动命令
    
    */
    
    public voidrun() {
    
    //            Random random = new Random();
    
    //            try {
    
    //                Thread.sleep(random.nextInt(10) * 1000);//随机延迟几秒,模拟消息发送过程或启动过程
    
    //            } catch (InterruptedException e) {
    
    //                e.printStackTrace();
    
    //            }
    
    Computer computer =this;
    
    computerManager.register(computer);//注册到启动集群中
    
    paxos(computer);
    
    }
    
    }
    
    /**
    
    *提案号管理类d
    
    */
    
    static classCId {
    
    privateIntegercid=1;
    
    privateCId() {
    
    }
    
    private staticCIdinstance=newCId();
    
    public synchronized staticIntegergetCid() {
    
    returninstance.cid++;
    
    }
    
    }
    
    }
    

    程序在运行过程中会出现异常,但是也能选出合适的人选作为Leader,这不正是paxos所期望的事情么?

    相关文章

      网友评论

          本文标题:paxos算法java代码实现

          本文链接:https://www.haomeiwen.com/subject/nbzxwttx.html