美文网首页
Jetcd 实现主从选举(示例代码)

Jetcd 实现主从选举(示例代码)

作者: 听歌闭麦开始自闭 | 来源:发表于2024-02-27 16:19 被阅读0次
    @Slf4j
    @Component
    public class JetcdElectionService implements ApplicationListener<ContextRefreshedEvent> {
        private final Client jetcdClient;
        private final String electionNameText = "/testElection";
        private final String firstNonLoopbackAddress;
    
        private final AtomicReference<Watch.Watcher> watcher = new AtomicReference<>();
        private final AtomicBoolean connectionExceptionFlag = new AtomicBoolean();
        private final ExecutorService executor = Executors.newSingleThreadExecutor();
    
        public JetcdElectionService(Client jetcdClient, @Value("${server.port}") int port) {
            this.jetcdClient = jetcdClient;
            this.firstNonLoopbackAddress = "127.0.0.1:" + port;
        }
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            new Thread(this::run).start();
        }
    
        private void run() {
            ByteSequence electionName = this.getElectionName(this.electionNameText);
            ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
            log.info("[Election] 正在执行选举 [electionName: {}, proposal: {}]...", electionName.toString(), proposal.toString());
    
            Election electionClient = this.jetcdClient.getElectionClient();
            electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
    
            LeaderResponse leader = null;
            try {
                leader = electionClient.leader(electionName).get(); // etcd中没有leader会报错
            } catch (Exception ignored) {
            }
    
            if (leader == null) {
                log.info("[Election] 检测到leader不存在,当前实例正在尝试参选...");
                this.doElect(electionName, proposal);
            }
        }
    
        private boolean doElect(ByteSequence electionName, ByteSequence proposal) {
            Lease leaseClient = this.jetcdClient.getLeaseClient();
            Election electionClient = this.jetcdClient.getElectionClient();
    
            LeaseGrantResponse lease;
            try {
                lease = leaseClient.grant(15).get();
            } catch (InterruptedException | ExecutionException e) {
                log.error("[Election] 选举时发生异常,无法获得租约!", e);
                return false;
            }
            long leaseID = lease.getID();
    
            try {
                electionClient.campaign(electionName, leaseID, proposal).get(5, TimeUnit.SECONDS);
    
                // leader选举后进行自动续约,请求发送周期为: lease.ttl / 3
                leaseClient.keepAlive(leaseID, new LeaseKeepaliveObserver());
                log.info("[Election] 选举完成,当前实例当选成功!");
                return true;
            } catch (InterruptedException | ExecutionException e) {
                log.error("[Election] 选举时发生异常,无法获得租约!", e);
                return false;
            } catch (TimeoutException e) {
                log.info("[Election] 当前实例未能选中,停止参选中.");
                return false;
            }
        }
    
        private void handleLeaderResponse(LeaderResponse response) {
            Watch watchClient = this.jetcdClient.getWatchClient();
    
            KeyValue kv = response.getKv();
            ByteSequence proposalKey = kv.getKey(); // ${electionName}/${随机字符串}
            log.info("[Election] 选举完成,当选实例信息[key: {}, value: {}]", proposalKey.toString(), kv.getValue().toString());
    
            Watch.Watcher oldWatcher = this.watcher.get();
            if (oldWatcher != null) {
                oldWatcher.close();
            }
    
            Watch.Watcher newWatcher = watchClient.watch(proposalKey, new LeaderWatchListener(this::handleLeaderChange, this::handleWatchError, this.executor));
            this.watcher.compareAndSet(oldWatcher, newWatcher);
        }
    
        private void handleLeaderChange(WatchResponse response) {
            ByteSequence electionName = this.getElectionName(this.electionNameText);
            ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
            Election electionClient = this.jetcdClient.getElectionClient();
    
            List<WatchEvent> events = response.getEvents();
            WatchEvent watchEvent = null;
            if (events != null && !events.isEmpty()) {
                watchEvent = events.get(0);
            }
            WatchEvent.EventType eventType = Optional.ofNullable(watchEvent).map(WatchEvent::getEventType).orElse(null);
    
            if (this.connectionExceptionFlag.get()) {
                log.info("[Election] [{}] 检测到与etcd连接异常,重新注册observe服务.", eventType);
    
                electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
                this.connectionExceptionFlag.compareAndSet(true, false);
            } else {
                log.info("[Election] [{}] 检测到leader变动事件,当前实例正在尝试参选...", eventType);
            }
            this.doElect(electionName, proposal); 
        }
    
        private void handleWatchError(Throwable throwable) {
            // 发现和etcd连接出现了异常
            this.connectionExceptionFlag.compareAndSet(false, true);
        }
    
        public ByteSequence getElectionName(String electionNameText) {
            return ByteSequence.from(electionNameText, StandardCharsets.UTF_8);
        }
    
        public ByteSequence getProposal(String firstNonLoopbackAddress) {
            return ByteSequence.from(firstNonLoopbackAddress, StandardCharsets.UTF_8);
        }
    
    
        private static class LeaderElectionListener implements Election.Listener {
            private final Consumer<LeaderResponse> leaderResponseConsumer;
            private final ExecutorService executor;
    
            public LeaderElectionListener(Consumer<LeaderResponse> leaderResponseConsumer, ExecutorService executor) {
                this.leaderResponseConsumer = leaderResponseConsumer;
                this.executor = executor;
            }
    
            @Override
            public void onNext(LeaderResponse response) {
                CompletableFuture.runAsync(() -> this.leaderResponseConsumer.accept(response), this.executor).exceptionally((e) -> {
                    if (e != null) {
                        e.printStackTrace();
                    }
                    return null;
                });
            }
    
            @Override
            public void onError(Throwable throwable) {
                log.error(throwable.getMessage(), new RuntimeException(throwable));
            }
    
            @Override
            public void onCompleted() {
            }
        }
    
        private static class LeaderWatchListener implements Watch.Listener {
            private final Consumer<WatchResponse> leaderChangeConsumer;
            private final Consumer<Throwable> onErrorConsumer;
            private final ExecutorService executor;
    
            public LeaderWatchListener(Consumer<WatchResponse> leaderChangeConsumer, Consumer<Throwable> onErrorConsumer, ExecutorService executor) {
                this.leaderChangeConsumer = leaderChangeConsumer;
                this.onErrorConsumer = onErrorConsumer;
                this.executor = executor;
            }
    
            @Override
            public void onNext(WatchResponse response) {
                CompletableFuture.runAsync(() -> this.leaderChangeConsumer.accept(response), this.executor).exceptionally((e) -> {
                    if (e != null) {
                        log.error(e.getMessage(), e);
                    }
                    return null;
                });
            }
    
            @Override
            public void onError(Throwable throwable) {
                RuntimeException t = new RuntimeException(throwable);
                log.error(throwable.getMessage(), t);
                this.onErrorConsumer.accept(t);
            }
    
            @Override
            public void onCompleted() {
            }
        }
    
        private static class LeaseKeepaliveObserver implements StreamObserver<LeaseKeepAliveResponse> {
    
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
            }
    
            @Override
            public void onError(Throwable throwable) {
                log.error(throwable.getMessage(), new RuntimeException(throwable));
            }
    
            @Override
            public void onCompleted() {
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Jetcd 实现主从选举(示例代码)

          本文链接:https://www.haomeiwen.com/subject/uvdtzdtx.html