@Slf4j
@Component
public class JetcdElectionService implements ApplicationListener<ContextRefreshedEvent> {
private final Client jetcdClient;
private final String electionNameText = "/testElection";
private final String firstNonLoopbackAddress;
private final AtomicReference<Watch.Watcher> watcher = new AtomicReference<>();
private final AtomicBoolean connectionExceptionFlag = new AtomicBoolean();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public JetcdElectionService(Client jetcdClient, @Value("${server.port}") int port) {
this.jetcdClient = jetcdClient;
this.firstNonLoopbackAddress = "127.0.0.1:" + port;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
new Thread(this::run).start();
}
private void run() {
ByteSequence electionName = this.getElectionName(this.electionNameText);
ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
log.info("[Election] 正在执行选举 [electionName: {}, proposal: {}]...", electionName.toString(), proposal.toString());
Election electionClient = this.jetcdClient.getElectionClient();
electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
LeaderResponse leader = null;
try {
leader = electionClient.leader(electionName).get(); // etcd中没有leader会报错
} catch (Exception ignored) {
}
if (leader == null) {
log.info("[Election] 检测到leader不存在,当前实例正在尝试参选...");
this.doElect(electionName, proposal);
}
}
private boolean doElect(ByteSequence electionName, ByteSequence proposal) {
Lease leaseClient = this.jetcdClient.getLeaseClient();
Election electionClient = this.jetcdClient.getElectionClient();
LeaseGrantResponse lease;
try {
lease = leaseClient.grant(15).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[Election] 选举时发生异常,无法获得租约!", e);
return false;
}
long leaseID = lease.getID();
try {
electionClient.campaign(electionName, leaseID, proposal).get(5, TimeUnit.SECONDS);
// leader选举后进行自动续约,请求发送周期为: lease.ttl / 3
leaseClient.keepAlive(leaseID, new LeaseKeepaliveObserver());
log.info("[Election] 选举完成,当前实例当选成功!");
return true;
} catch (InterruptedException | ExecutionException e) {
log.error("[Election] 选举时发生异常,无法获得租约!", e);
return false;
} catch (TimeoutException e) {
log.info("[Election] 当前实例未能选中,停止参选中.");
return false;
}
}
private void handleLeaderResponse(LeaderResponse response) {
Watch watchClient = this.jetcdClient.getWatchClient();
KeyValue kv = response.getKv();
ByteSequence proposalKey = kv.getKey(); // ${electionName}/${随机字符串}
log.info("[Election] 选举完成,当选实例信息[key: {}, value: {}]", proposalKey.toString(), kv.getValue().toString());
Watch.Watcher oldWatcher = this.watcher.get();
if (oldWatcher != null) {
oldWatcher.close();
}
Watch.Watcher newWatcher = watchClient.watch(proposalKey, new LeaderWatchListener(this::handleLeaderChange, this::handleWatchError, this.executor));
this.watcher.compareAndSet(oldWatcher, newWatcher);
}
private void handleLeaderChange(WatchResponse response) {
ByteSequence electionName = this.getElectionName(this.electionNameText);
ByteSequence proposal = this.getProposal(this.firstNonLoopbackAddress);
Election electionClient = this.jetcdClient.getElectionClient();
List<WatchEvent> events = response.getEvents();
WatchEvent watchEvent = null;
if (events != null && !events.isEmpty()) {
watchEvent = events.get(0);
}
WatchEvent.EventType eventType = Optional.ofNullable(watchEvent).map(WatchEvent::getEventType).orElse(null);
if (this.connectionExceptionFlag.get()) {
log.info("[Election] [{}] 检测到与etcd连接异常,重新注册observe服务.", eventType);
electionClient.observe(electionName, new LeaderElectionListener(this::handleLeaderResponse, this.executor));
this.connectionExceptionFlag.compareAndSet(true, false);
} else {
log.info("[Election] [{}] 检测到leader变动事件,当前实例正在尝试参选...", eventType);
}
this.doElect(electionName, proposal);
}
private void handleWatchError(Throwable throwable) {
// 发现和etcd连接出现了异常
this.connectionExceptionFlag.compareAndSet(false, true);
}
public ByteSequence getElectionName(String electionNameText) {
return ByteSequence.from(electionNameText, StandardCharsets.UTF_8);
}
public ByteSequence getProposal(String firstNonLoopbackAddress) {
return ByteSequence.from(firstNonLoopbackAddress, StandardCharsets.UTF_8);
}
private static class LeaderElectionListener implements Election.Listener {
private final Consumer<LeaderResponse> leaderResponseConsumer;
private final ExecutorService executor;
public LeaderElectionListener(Consumer<LeaderResponse> leaderResponseConsumer, ExecutorService executor) {
this.leaderResponseConsumer = leaderResponseConsumer;
this.executor = executor;
}
@Override
public void onNext(LeaderResponse response) {
CompletableFuture.runAsync(() -> this.leaderResponseConsumer.accept(response), this.executor).exceptionally((e) -> {
if (e != null) {
e.printStackTrace();
}
return null;
});
}
@Override
public void onError(Throwable throwable) {
log.error(throwable.getMessage(), new RuntimeException(throwable));
}
@Override
public void onCompleted() {
}
}
private static class LeaderWatchListener implements Watch.Listener {
private final Consumer<WatchResponse> leaderChangeConsumer;
private final Consumer<Throwable> onErrorConsumer;
private final ExecutorService executor;
public LeaderWatchListener(Consumer<WatchResponse> leaderChangeConsumer, Consumer<Throwable> onErrorConsumer, ExecutorService executor) {
this.leaderChangeConsumer = leaderChangeConsumer;
this.onErrorConsumer = onErrorConsumer;
this.executor = executor;
}
@Override
public void onNext(WatchResponse response) {
CompletableFuture.runAsync(() -> this.leaderChangeConsumer.accept(response), this.executor).exceptionally((e) -> {
if (e != null) {
log.error(e.getMessage(), e);
}
return null;
});
}
@Override
public void onError(Throwable throwable) {
RuntimeException t = new RuntimeException(throwable);
log.error(throwable.getMessage(), t);
this.onErrorConsumer.accept(t);
}
@Override
public void onCompleted() {
}
}
private static class LeaseKeepaliveObserver implements StreamObserver<LeaseKeepAliveResponse> {
@Override
public void onNext(LeaseKeepAliveResponse value) {
}
@Override
public void onError(Throwable throwable) {
log.error(throwable.getMessage(), new RuntimeException(throwable));
}
@Override
public void onCompleted() {
}
}
}
网友评论