美文网首页
Elasticsearch的Ping机制

Elasticsearch的Ping机制

作者: Ombres | 来源:发表于2019-07-09 19:08 被阅读0次

    简介

    在1.x的时候,Elasticsearch默认的集群通信是多播方式,也支持单播,而从2.x开始,默认是单播模式。到目前的7.x,多播模式以及不再使用,而且源码中不再包含有多播的方式。

    Ping机制

    ZenDiscovery中ZenPing是用来进行节点通信探测的接口,目前7.x其实现类是UnicastZenPing

    ZenPing

    包括几个内部工具类

    1. PingCollection 收集Ping通信的结果
    2. PingResponse Ping通信的结果

    提供两个方法

    1. start() 启动服务
    2. void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout); 提供ping消息通信的接口方法

    UnicastZenPing

    构造器

    主要干了以下几件事:

    1. transportService注册一个RequestHandler,使用SAME线程池
    2. 创建Ping机制的守护线程池,这里使用的是EsExecutors.newScaling()

    对外方法

    主要实现ZenPing接口的两个方法

    1. start() 实际没有进行任何操作。
    2. 提供ping的基础操作。这个方法的调用时在ZenDiscovery中,是在master不存在的时候进行调用。

    Ping流程

    1. 获取所有的seed地址,这里首先获取配置中的所有的seed list
    2. 获取所有的DiscoveryNodes 这是之前的集群信息中保存的节点,contextProvider实际是ZenDisovery
    3. 遍历节点,获取所有的master节点,加入到seed list
    4. 构建PingingRound ,用来进行Ping操作
    5. 构建一个Runnable对象,作为Ping操作的发送器
    6. 在线程池中进行三次ping发送请求,三次有间隔

    主要流程见以下代码,具体逻辑就不展开一一细说了:

      protected void ping(final Consumer<PingCollection> resultsConsumer,
                            final TimeValue scheduleDuration,
                            final TimeValue requestDuration) {
            //1. 获取所有的seed地址,这里首先获取配置中的所有的seed
            final List<TransportAddress> seedAddresses = new ArrayList<>();
            seedAddresses.addAll(hostsProvider.getSeedAddresses(createHostsResolver()));
            // 2. 获取所有的DiscoveryNodes 这是之前的集群信息中保存的节点,contextProvider实际是ZenDisovery
            final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
            // 3. 遍历节点,获取所有的master节点,加入到seed list
            for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
                seedAddresses.add(masterNode.value.getAddress());
            }
            //4.  构建PingingRound ,用来进行Ping操作
            final ConnectionProfile connectionProfile =
                ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
            final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
                nodes.getLocalNode(), connectionProfile);
            activePingingRounds.put(pingingRound.id(), pingingRound);
            //5. 构建一个Runnable对象,作为Ping操作的发送器
            final AbstractRunnable pingSender = new AbstractRunnable() {
                @Override
                public void onFailure(Exception e) {
                    if (e instanceof AlreadyClosedException == false) {
                        logger.warn("unexpected error while pinging", e);
                    }
                }
    
                @Override
                protected void doRun() throws Exception {
                    sendPings(requestDuration, pingingRound);
                }
            };
            //先执行一次 ping操作
            threadPool.generic().execute(pingSender);
            //加入定时任务延时一定时间之后再执行。
            threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC);
            threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC);
            // 结束Ping操作
            threadPool.schedule(new AbstractRunnable() {
                @Override
                protected void doRun() throws Exception {
                    finishPingingRound(pingingRound);
                }
    
                @Override
                public void onFailure(Exception e) {
                    logger.warn("unexpected error while finishing pinging round", e);
                }
            }, scheduleDuration, ThreadPool.Names.GENERIC);
        }
    

    相关文章

      网友评论

          本文标题:Elasticsearch的Ping机制

          本文链接:https://www.haomeiwen.com/subject/jnmfkctx.html