美文网首页Elaticsearch进阶Elasticsearch
Elasticsearch 5.x 源码分析(2)Transpo

Elasticsearch 5.x 源码分析(2)Transpo

作者: 华安火车迷 | 来源:发表于2017-05-31 18:49 被阅读4493次

    问题列表

    • ES有哪两种Client
    • 两种需要建立的连接数和需要建立连接的Nodes数是怎么样的
    • 两种Client都有办法取得Cluster的所有的Nodes节点吗
    • 其他Node是否会感知这个Client的存在?(未答,待更新)

    这章来聊聊Client,首先我们看一下TransportClient的源代码,其实看了之前一篇文章的话可以看到TransportClient的初始化过程和一个Node的过程是非常类似的,然后我们再看一下ES 5.x 新弄的RestClient,一个稍微简单,轻量的Client(重点是不需要和Node之间保持长连接,和RestClient 不会和ES版本保持强依赖。
    本篇主要是看一下Client是如何初始化的,以及它是如何和其他Node保持关系的,至于怎么调用Client发起查询则不在本篇讨论范围之内。


    TransportClient

    首先我们看看Elasticsearch 官网给出的new 一个Client的关键参数:

    TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
    
    // on shutdown
    
    client.close();
    
    Settings settings = Settings.builder()
            .put("cluster.name", "myClusterName").build();
    TransportClient client = new PreBuiltTransportClient(settings);
    //Add transport addresses and do something with the client...
    
    Settings settings = Settings.builder()
            .put("client.transport.sniff", true).build();
    TransportClient client = new PreBuiltTransportClient(settings);
    

    创建一个Client最最重要就是上面列的:

    • 要连接的IP地址
    • 要连接的集群
    • 是否采用嗅探
      PreBuiltTransportClient其实就是一个TransportClient的 builder类,完全没后自己方法
    public class PreBuiltTransportClient extends TransportClient {
    ...
        /**
         * Creates a new transport client with pre-installed plugins.
         *
         * @param settings            the settings passed to this transport client
         * @param plugins             a collection of additional plugins to run with this client
         * @param hostFailureListener a failure listener that is invoked if a node is disconnected; this can be <code>null</code>
         */
        public PreBuiltTransportClient(
            Settings settings,
            Collection<Class<? extends Plugin>> plugins,
            HostFailureListener hostFailureListener) {
            super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), hostFailureListener);
        }
    

    直接进去TransportClient的类看,首先是4个重要参数。

        public static final Setting<TimeValue> CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL =
            Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Setting.Property.NodeScope);
        public static final Setting<TimeValue> CLIENT_TRANSPORT_PING_TIMEOUT =
            Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Setting.Property.NodeScope);
        public static final Setting<Boolean> CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME =
            Setting.boolSetting("client.transport.ignore_cluster_name", false, Setting.Property.NodeScope);
        public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
            Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
    

    nodes_sampler_intervalping_timeout两个参数默认值都是5s。构造函数的话和Node的初始化是非常相似的:

    private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
                                                    Collection<Class<? extends Plugin>> plugins, HostFailureListener failureListner) {
            if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) {
                providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
            }
            final PluginsService pluginsService = newPluginService(providedSettings, plugins);
            final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
            final List<Closeable> resourcesToClose = new ArrayList<>();
            final ThreadPool threadPool = new ThreadPool(settings);
            resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
            final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
            try {
                final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
                final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
                for (final ExecutorBuilder<?> builder : threadPool.builders()) {
                    additionalSettings.addAll(builder.getRegisteredSettings());
                }
                SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
    
                SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
                List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
                entries.addAll(NetworkModule.getNamedWriteables());
                entries.addAll(searchModule.getNamedWriteables());
                entries.addAll(ClusterModule.getNamedWriteables());
                entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
                                             .flatMap(p -> p.getNamedWriteables().stream())
                                             .collect(Collectors.toList()));
                NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
                NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
                        searchModule.getNamedXContents().stream(),
                        pluginsService.filterPlugins(Plugin.class).stream()
                                .flatMap(p -> p.getNamedXContent().stream())
                        ).flatMap(Function.identity()).collect(toList()));
    
                ModulesBuilder modules = new ModulesBuilder();
                // plugin modules must be added here, before others or we can get crazy injection errors...
                for (Module pluginModule : pluginsService.createGuiceModules()) {
                    modules.add(pluginModule);
                }
                modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
                ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
                        settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
                        pluginsService.filterPlugins(ActionPlugin.class), null, null);
                modules.add(actionModule);
    
                CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
                    settingsModule.getClusterSettings());
                resourcesToClose.add(circuitBreakerService);
                BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
                resourcesToClose.add(bigArrays);
                modules.add(settingsModule);
                NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
                    bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
                final Transport transport = networkModule.getTransportSupplier().get();
                final TransportService transportService = new TransportService(settings, transport, threadPool,
                    networkModule.getTransportInterceptor(),
                    boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
                        UUIDs.randomBase64UUID()), null);
                modules.add((b -> {
                    b.bind(BigArrays.class).toInstance(bigArrays);
                    b.bind(PluginsService.class).toInstance(pluginsService);
                    b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                    b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                    b.bind(Transport.class).toInstance(transport);
                    b.bind(TransportService.class).toInstance(transportService);
                    b.bind(NetworkService.class).toInstance(networkService);
                }));
    
                Injector injector = modules.createInjector();
                final TransportClientNodesService nodesService =
                    new TransportClientNodesService(settings, transportService, threadPool, failureListner == null
                        ? (t, e) -> {} : failureListner);
                final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
                    actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
    
                List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>(pluginsService.getGuiceServiceClasses().stream()
                    .map(injector::getInstance).collect(Collectors.toList()));
                resourcesToClose.addAll(pluginLifecycleComponents);
    
                transportService.start();
                transportService.acceptIncomingRequests();
    
                ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, namedWriteableRegistry);
                resourcesToClose.clear();
                return transportClient;
            } finally {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    

    当然Client相比起来肯定不会初始化Node节点所有的东西,但是一些重要的东西是不会缺少的,比如threadPoolNetworkModuleTransportService等等。最后会new出一个ClientTemplate,这个类封装了5个变量

    private static final class ClientTemplate {
            final Injector injector;
            private final List<LifecycleComponent> pluginLifecycleComponents;
            private final TransportClientNodesService nodesService;
            private final TransportProxyClient proxy;
            private final NamedWriteableRegistry namedWriteableRegistry;
    
            private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
                    TransportClientNodesService nodesService, TransportProxyClient proxy, NamedWriteableRegistry namedWriteableRegistry) {
                this.injector = injector;
                this.pluginLifecycleComponents = pluginLifecycleComponents;
                this.nodesService = nodesService;
                this.proxy = proxy;
                this.namedWriteableRegistry = namedWriteableRegistry;
            }
    
            Settings getSettings() {
                return injector.getInstance(Settings.class);
            }
    
            ThreadPool getThreadPool() {
                return injector.getInstance(ThreadPool.class);
            }
        }
    

    injector不用说了,就是管理bean依赖的,proxy我在后面会说,namedWritableRegistry 我现在不知道是什么东西来的,我感觉好像是一些request和response的一些序列化的东西,以后读懂了会回来更新这里。这里先看一下这个nodesService,它是一个TransportClientNodesService类,通俗讲就是给Client用来感知和管理与周边的Node通讯用,应该和NodeService是做类似的东西

       TransportClientNodesService(Settings settings, TransportService transportService,
                                           ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
            super(settings);
            this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
            this.transportService = transportService;
            this.threadPool = threadPool;
            this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
    
            this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
            this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
            this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);
    
            if (logger.isDebugEnabled()) {
                logger.debug("node_sampler_interval[{}]", nodesSamplerInterval);
            }
    
            if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
                this.nodesSampler = new SniffNodesSampler();
            } else {
                this.nodesSampler = new SimpleNodeSampler();
            }
            this.hostFailureListener = hostFailureListener;
            this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
        }
    

    重要参数Sniff 就用在这里,根绝不同的配置定义了SniffNodesSampler或者是SimpleNodeSampler,留意最后还初始化了一个定时器,就是按配置的如每5s去ping一下其他nodes。

        class SimpleNodeSampler extends NodeSampler {
    
            @Override
            protected void doSample() {
                HashSet<DiscoveryNode> newNodes = new HashSet<>();
                HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
                for (DiscoveryNode listedNode : listedNodes) {
                    try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
                        final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
                            new FutureTransportResponseHandler<LivenessResponse>() {
                                @Override
                                public LivenessResponse newInstance() {
                                    return new LivenessResponse();
                                }
                            });
                        transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
                            TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
                            handler);
                        final LivenessResponse livenessResponse = handler.txGet();
                        if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
                            logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
                            newFilteredNodes.add(listedNode);
                        } else {
                            // use discovered information but do keep the original transport address,
                            // so people can control which address is exactly used.
                            DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
                            newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
                                nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
                                nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
                        }
                    } catch (ConnectTransportException e) {
                        logger.debug(
                            (Supplier<?>)
                                () -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
                        hostFailureListener.onNodeDisconnected(listedNode, e);
                    } catch (Exception e) {
                        logger.info(
                            (Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
                    }
                }
    
                nodes = validateNewNodes(newNodes);
                filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
            }
        }
    

    SampleNodeSampler很简单,其实就是把配置进去的listedNodes去请求一个STATE的request,注意,这里用的是TransportService去拿connection(底层是用netty4Transport),而线程是用GENERIC的线程池。把成功建立连接的所有的Nodes保存起来,而与每个Node也只保持1条连接。
    再来看看SniffNodesSampler

    class SniffNodesSampler extends NodeSampler {
    
            @Override
            protected void doSample() {
                // the nodes we are going to ping include the core listed nodes that were added
                // and the last round of discovered nodes
                Set<DiscoveryNode> nodesToPing = new HashSet<>();
                for (DiscoveryNode node : listedNodes) {
                    nodesToPing.add(node);
                }
                for (DiscoveryNode node : nodes) {
                    nodesToPing.add(node);
                }
    
                final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
                final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
                try {
                    for (final DiscoveryNode nodeToPing : nodesToPing) {
                        threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
    
    

    首先也是先向所有的listedNode都ping一遍,注意这里用的是MANAGEMENTthreadPool

                           @Override
                            protected void doRun() throws Exception {
                                Transport.Connection pingConnection = null;
                                if (nodes.contains(nodeToPing)) {
                                    try {
                                        pingConnection = transportService.getConnection(nodeToPing);
                                    } catch (NodeNotConnectedException e) {
                                        // will use a temp connection
                                    }
                                }
                                if (pingConnection == null) {
                                    logger.trace("connecting to cluster node [{}]", nodeToPing);
                                    connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
                                    pingConnection = connectionToClose;
                                }
                                transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
                                    Requests.clusterStateRequest().clear().nodes(true).local(true),
                                    TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
                                        .withTimeout(pingTimeout).build(),
                                    new TransportResponseHandler<ClusterStateResponse>() {
    
                                        @Override
                                        public ClusterStateResponse newInstance() {
                                            return new ClusterStateResponse();
                                        }
    
                                        @Override
                                        public String executor() {
                                            return ThreadPool.Names.SAME;
                                        }
    
                                        @Override
                                        public void handleResponse(ClusterStateResponse response) {
                                            clusterStateResponses.put(nodeToPing, response);
                                            onDone();
                                        }
    
                                        @Override
                                        public void handleException(TransportException e) {
                                            logger.info(
                                                (Supplier<?>) () -> new ParameterizedMessage(
                                                    "failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
                                            try {
                                                hostFailureListener.onNodeDisconnected(nodeToPing, e);
                                            } finally {
                                                onDone();
                                            }
                                        }
                                    });
                            }
    

    同样也是调用TransportService发起连接,这里要特别注意,这种方式下其实是建立了一堆连接connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);如每个类型多少条连接这样,所以这种模式一个Client会和一个Node保持一堆连接。回调函数都很简单,成功和失败的都归类,同时拿到了每个送回来的cluster的state保存下来clusterStateResponses.put(nodeToPing, response);

    HashSet<DiscoveryNode> newNodes = new HashSet<>();
                HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
                for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
                    if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                        logger.warn("node {} not part of the cluster {}, ignoring...",
                                entry.getValue().getState().nodes().getLocalNode(), clusterName);
                        newFilteredNodes.add(entry.getKey());
                        continue;
                    }
                    for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
                        newNodes.add(cursor.value);
                    }
                }
    
                nodes = validateNewNodes(newNodes);
                filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
    

    最后汇总再确认一遍所有的nodes,校验完后维护,其实这里的nodes就是整个集群的所有的nodes了,剩下的就交给那个调度器去每间隔时间去ping了。
    从这里我们也可以看出,其实这里我们已经建立好了连接了,因此以后有什么请求的话其实client向一个node取一个连接或者一个类型的连接就可以发起请求了。这就是之前说的proxy的事,我们回去看看proxy是什么东西。
    proxy也是在ClientTemplate里面被初始化:final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
    它只保存两个变量,nodesServiceproxies

    private final TransportClientNodesService nodesService;
        private final Map<Action, TransportActionNodeProxy> proxies;
    
        TransportProxyClient(Settings settings, TransportService transportService,
                                    TransportClientNodesService nodesService, List<GenericAction> actions) {
            this.nodesService = nodesService;
            Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
            for (GenericAction action : actions) {
                if (action instanceof Action) {
                    proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
                }
            }
            this.proxies = unmodifiableMap(proxies);
        }
    
        public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
            ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
                                                                                  final Request request, ActionListener<Response> listener) {
            final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
            nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
        }
    

    调用也很简单,就是得到一个action的proxy,指定一个函数proxy.execute()然后把这个函数扔给nodesService去执行它。说白就是proxies里记录了每种action如何执行请求,然后让nodesService随机选一个node来发送:

        public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
            // we first read nodes before checking the closed state; this
            // is because otherwise we could be subject to a race where we
            // read the state as not being closed, and then the client is
            // closed and the nodes list is cleared, and then a
            // NoNodeAvailableException is thrown
            // it is important that the order of first setting the state of
            // closed and then clearing the list of nodes is maintained in
            // the close method
            final List<DiscoveryNode> nodes = this.nodes;
            if (closed) {
                throw new IllegalStateException("transport client is closed");
            }
            ensureNodesAreAvailable(nodes);
            int index = getNodeNumber();
            RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
            DiscoveryNode node = retryListener.getNode(0);
            try {
                callback.doWithNode(node, retryListener);
            } catch (Exception e) {
                try {
                    //this exception can't come from the TransportService as it doesn't throw exception at all
                    listener.onFailure(e);
                } finally {
                    retryListener.maybeNodeFailed(node, e);
                }
            }
        }
    

    RestClient

    RestClient 其实再简单不过了,看头100行的源代码就基本看完了

    public class RestClient implements Closeable {
    
        private static final Log logger = LogFactory.getLog(RestClient.class);
    
        private final CloseableHttpAsyncClient client;
        //we don't rely on default headers supported by HttpAsyncClient as those cannot be replaced
        private final Header[] defaultHeaders;
        private final long maxRetryTimeoutMillis;
        private final String pathPrefix;
        private final AtomicInteger lastHostIndex = new AtomicInteger(0);
        private volatile HostTuple<Set<HttpHost>> hostTuple;
        private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
        private final FailureListener failureListener;
    
        RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
                   HttpHost[] hosts, String pathPrefix, FailureListener failureListener) {
            this.client = client;
            this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
            this.defaultHeaders = defaultHeaders;
            this.failureListener = failureListener;
            this.pathPrefix = pathPrefix;
            setHosts(hosts);
        }
    
        /**
         * Returns a new {@link RestClientBuilder} to help with {@link RestClient} creation.
         */
        public static RestClientBuilder builder(HttpHost... hosts) {
            return new RestClientBuilder(hosts);
        }
    
        /**
         * Replaces the hosts that the client communicates with.
         * @see HttpHost
         */
        public synchronized void setHosts(HttpHost... hosts) {
            if (hosts == null || hosts.length == 0) {
                throw new IllegalArgumentException("hosts must not be null nor empty");
            }
            Set<HttpHost> httpHosts = new HashSet<>();
            AuthCache authCache = new BasicAuthCache();
            for (HttpHost host : hosts) {
                Objects.requireNonNull(host, "host cannot be null");
                httpHosts.add(host);
                authCache.put(host, new BasicScheme());
            }
            this.hostTuple = new HostTuple<>(Collections.unmodifiableSet(httpHosts), authCache);
            this.blacklist.clear();
        }
    

    这里与TransportClient最大不一样就是这里不会启用sniff,仅负责维护配置进去的所有的hosts,需要简要的话都把authCahe 保存起来,剩下就是所有的请求都交给apache 的httpClient去做了。


    Sniffer组件

    image.png

    那如果我真心觉得RestClient好用而我又想用sniff那咋整,贴心的ES团队给个小插件给你,用它来绑定一下你的client,它会帮你去嗅探出这个cluster的所有机器并回调client的setHosts()

    void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
                if (running.compareAndSet(false, true)) {
                    try {
                        List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
                        logger.debug("sniffed hosts: " + sniffedHosts);
                        if (excludeHost != null) {
                            sniffedHosts.remove(excludeHost);
                        }
                        if (sniffedHosts.isEmpty()) {
                            logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
                        } else {
                            this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
                        }
                    } catch (Exception e) {
                        logger.error("error while sniffing nodes", e);
                    } finally {
                        scheduleNextRun(nextSniffDelayMillis);
                        running.set(false);
                    }
                }
            }
    

    本篇完

    相关文章

      网友评论

      • 小潘童鞋:Sniffer组件 在哪里呢?:smile:
        1552981fb36e:使用es的HttpClient 高并发性能很不理想,比es的took时间长的很多,大神能给讲解一下么
        小潘童鞋:@华安火车迷 3q
        华安火车迷:@小潘童鞋 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/sniffer.html

      本文标题:Elasticsearch 5.x 源码分析(2)Transpo

      本文链接:https://www.haomeiwen.com/subject/koyjfxtx.html