美文网首页
leaderLatch 选主

leaderLatch 选主

作者: Audience0 | 来源:发表于2019-12-20 15:26 被阅读0次

    创建临时有序节点,并进行排序,获取当前节点在排序中的下标index。
    如果index小于0,表示节点不在排序列表中,则进行重启
    如果index等于0,表示当前节点为主节点,则调用LeaderLatchListener.isLeader()
    如果index大于0,表示当前节点不是主节点,则监控前一个节点的删除事件,并调用LeaderLatchListener.notLeader()方法。当前监控到前一个节点的删除事件时,重新排序,比较,获取index,此时index为0,则为新master


    实例

    public class ZkSchedulerFactoryBean extends SchedulerFactoryBean {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ZkSchedulerFactoryBean.class);
    
        private LeaderLatch leaderLatch;
        @Autowired
        private ZkClient zkClient;
    
        private static final String LEAD_PATH = "/lead";
    
        public ZkSchedulerFactoryBean() throws Exception {
    
            this.setAutoStartup(false);
            //初始化LeaderLatch
            leaderLatch = new LeaderLatch(zkClient.getClient(),LEAD_PATH,zkClient.getIp());
            leaderLatch.addListener(new MyLeaderLatchListener(this,zkClient.getIp()));
            leaderLatch.start();
        }
    
        @Override
        public void startScheduler(final Scheduler scheduler, final int startupDelay) throws SchedulerException {
    
            if (this.isAutoStartup()) {
                super.startScheduler(scheduler, startupDelay);
            }
    
        }
    
        @Override
        public void destroy() throws SchedulerException {
            CloseableUtils.closeQuietly(leaderLatch);
            super.destroy();
        }
    
        class MyLeaderLatchListener implements LeaderLatchListener{
    
            private SchedulerFactoryBean schedulerFactoryBean;
    
            private String ip;
    
            public MyLeaderLatchListener(SchedulerFactoryBean schedulerFactoryBean, String ip) {
                this.schedulerFactoryBean = schedulerFactoryBean;
                this.ip = ip;
            }
    
            @Override
            public void isLeader() {
                LOGGER.info("ip:{},成为leader,执行Scheduler~!");
                schedulerFactoryBean.setAutoStartup(true);
                schedulerFactoryBean.start();
            }
    
            @Override
            public void notLeader() {
                LOGGER.info("ip:{},不是leader,停止Scheduler~!");
                schedulerFactoryBean.setAutoStartup(false);
                schedulerFactoryBean.stop();
            }
        }
    }
    
    

    leaderLatch 分析

    public void start() throws Exception
        {
             //通过AtomicReference原子操作 判断是否已经启动
            Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
            //AfterConnectionEstablished.execute开启线程,执行run方法
            startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        //
                        internalStart();
                    }
                    finally
                    {
                        startTask.set(null);
                    }
                }
            }));
        }
    
     private synchronized void internalStart()
        {
            if ( state.get() == State.STARTED )
            {
                //添加状态变化监听器,此处会通过调用setLeadership方法,调用isleader(),notleader()
                //将listener交给 connectionStateManager管理
                client.getConnectionStateListenable().addListener(listener);
                try
                {
                    reset();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("An error occurred checking resetting leadership.", e);
                }
            }
        }
    //internalStart中client添加的监听器
    private final ConnectionStateListener listener = new ConnectionStateListener()
        {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState)
            {
                //状态变化处理
                handleStateChange(newState);
            }
        };
    
    private void handleStateChange(ConnectionState newState)
        {
            switch ( newState )
            {
                default:
                {
                    // NOP
                    break;
                }
    
                case RECONNECTED:
                {
                    try
                    {    //重连时,不是主节点则重启
                        //hasLeadership(AtomicBoolean  类型)  保存是否是 主节点
                        if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
                        {
                            reset();
                        }
                    }
                    catch ( Exception e )
                    {
                        ThreadUtils.checkInterrupted(e);
                        log.error("Could not reset leader latch", e);
                        setLeadership(false);
                    }
                    break;
                }
    
                case SUSPENDED:
                {  //连接暂停,则设置 主节点false
                    if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
                    {
                        setLeadership(false);
                    }
                    break;
                }
    
                case LOST:
                {  //连接丢失,设置主节点false
                    setLeadership(false);
                    break;
                }
            }
        }
    
    private synchronized void setLeadership(boolean newValue)
        {  //获取旧状态,设置新状态
            boolean oldValue = hasLeadership.getAndSet(newValue);
    
            if ( oldValue && !newValue )
            { // 之前是master,现在不是master 则调用我们 leaderLatch.addListener(new MyLeaderLatchListener(this,zkClient.getIp()));传入的MyLeaderLatchListener的notLeader()方法
                listeners.forEach(new Function<LeaderLatchListener, Void>()
                    {
                        @Override
                        public Void apply(LeaderLatchListener listener)
                        {
                            listener.notLeader();
                            return null;
                        }
                    });
            }
            else if ( !oldValue && newValue )
            { // 之前不是master 现在是master,则调用isLeader()方法
                listeners.forEach(new Function<LeaderLatchListener, Void>()
                    {
                        @Override
                        public Void apply(LeaderLatchListener input)
                        {
                            input.isLeader();
                            return null;
                        }
                    });
            }
            notifyAll();
        }
    
    @VisibleForTesting
        void reset() throws Exception
        {  //设置当前不是master
            setLeadership(false);
            //重启时,先删除老节点
            setNode(null);
            //在forPath方法里,后台执行该任务
            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( debugResetWaitLatch != null )
                    {
                        debugResetWaitLatch.await();
                        debugResetWaitLatch = null;
                    }
    
                    if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                    {
                        setNode(event.getName());
                        if ( state.get() == State.CLOSED )
                        {
                            setNode(null);
                        }
                        else
                        {
                            getChildren();
                        }
                    }
                    else
                    {
                        log.error("getChildren() failed. rc = " + event.getResultCode());
                    }
                }
            };
    //创建一个临时有序的节点   
         client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
        }
    
    //将老路径删除
    private void setNode(String newValue) throws Exception
        {
            String oldPath = ourPath.getAndSet(newValue);
            if ( oldPath != null )
            {
                client.delete().guaranteed().inBackground().forPath(oldPath);
            }
        }
    
    private void getChildren() throws Exception
        {
            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                    {
                        checkLeadership(event.getChildren());
                    }
                }
            };
            //查看子节点,执行callback
            client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
        }
    
     //校验当前节点是不是主节点
    private void checkLeadership(List<String> children) throws Exception
        {
            if ( debugCheckLeaderShipLatch != null )
            {
                debugCheckLeaderShipLatch.await();
            }
            //从ourPath获取当前节点
            final String localOurPath = ourPath.get();
            //排序所有节点
            List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
    //获取当前节点的下标       
     int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
            if ( ourIndex < 0 )
            {  //下标小于0,过去节点,重启
                log.error("Can't find our node. Resetting. Index: " + ourIndex);
                reset();
            }
            else if ( ourIndex == 0 )
            {  //下标为0 表示当前节点为主节点
                setLeadership(true);
            }
            else
            {  //监控前一个节点的删除事件
                String watchPath = sortedChildren.get(ourIndex - 1);
                Watcher watcher = new Watcher()
                {
                    @Override
                    public void process(WatchedEvent event)
                    {
                        if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                        {
                            try
                            {
                                getChildren();
                            }
                            catch ( Exception ex )
                            {
                                ThreadUtils.checkInterrupted(ex);
                                log.error("An error occurred checking the leadership.", ex);
                            }
                        }
                    }
                };
    
                BackgroundCallback callback = new BackgroundCallback()
                {
                    @Override
                    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                    {
                        if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                        {
                            // 过期节点,重启
                            reset();
                        }
                    }
                };
    //监控当前节点的前一个节点,如果监控到前一个节点删除事件,则重新调用getChildren(),判断当前节点是否为主节点。
    client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
            }
        }
    

    相关文章

      网友评论

          本文标题:leaderLatch 选主

          本文链接:https://www.haomeiwen.com/subject/ycybwctx.html