简介
在1.x的时候,Elasticsearch默认的集群通信是多播方式,也支持单播,而从2.x开始,默认是单播模式。到目前的7.x,多播模式以及不再使用,而且源码中不再包含有多播的方式。
Ping机制
ZenDiscovery中ZenPing
是用来进行节点通信探测的接口,目前7.x其实现类是UnicastZenPing
。
ZenPing
包括几个内部工具类
- PingCollection 收集Ping通信的结果
- PingResponse Ping通信的结果
提供两个方法
- start() 启动服务
- void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout); 提供ping消息通信的接口方法
UnicastZenPing
构造器
主要干了以下几件事:
- 为
transportService
注册一个RequestHandler
,使用SAME
线程池 - 创建Ping机制的守护线程池,这里使用的是
EsExecutors.newScaling()
对外方法
主要实现ZenPing接口的两个方法
- start() 实际没有进行任何操作。
- 提供ping的基础操作。这个方法的调用时在ZenDiscovery中,是在master不存在的时候进行调用。
Ping流程
- 获取所有的seed地址,这里首先获取配置中的所有的seed list
- 获取所有的DiscoveryNodes 这是之前的集群信息中保存的节点,contextProvider实际是ZenDisovery
- 遍历节点,获取所有的master节点,加入到seed list
- 构建PingingRound ,用来进行Ping操作
- 构建一个Runnable对象,作为Ping操作的发送器
- 在线程池中进行三次ping发送请求,三次有间隔
主要流程见以下代码,具体逻辑就不展开一一细说了:
protected void ping(final Consumer<PingCollection> resultsConsumer,
final TimeValue scheduleDuration,
final TimeValue requestDuration) {
//1. 获取所有的seed地址,这里首先获取配置中的所有的seed
final List<TransportAddress> seedAddresses = new ArrayList<>();
seedAddresses.addAll(hostsProvider.getSeedAddresses(createHostsResolver()));
// 2. 获取所有的DiscoveryNodes 这是之前的集群信息中保存的节点,contextProvider实际是ZenDisovery
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
// 3. 遍历节点,获取所有的master节点,加入到seed list
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
seedAddresses.add(masterNode.value.getAddress());
}
//4. 构建PingingRound ,用来进行Ping操作
final ConnectionProfile connectionProfile =
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
nodes.getLocalNode(), connectionProfile);
activePingingRounds.put(pingingRound.id(), pingingRound);
//5. 构建一个Runnable对象,作为Ping操作的发送器
final AbstractRunnable pingSender = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (e instanceof AlreadyClosedException == false) {
logger.warn("unexpected error while pinging", e);
}
}
@Override
protected void doRun() throws Exception {
sendPings(requestDuration, pingingRound);
}
};
//先执行一次 ping操作
threadPool.generic().execute(pingSender);
//加入定时任务延时一定时间之后再执行。
threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC);
threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC);
// 结束Ping操作
threadPool.schedule(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
finishPingingRound(pingingRound);
}
@Override
public void onFailure(Exception e) {
logger.warn("unexpected error while finishing pinging round", e);
}
}, scheduleDuration, ThreadPool.Names.GENERIC);
}
网友评论