Elasticsearch 客户端源码操作流程 1

作者: kason_zhang | 来源:发表于2018-05-29 14:31 被阅读7次

    阅读ES源代码需要对Guice这个依赖注入框架有一点基本了解, 稍微了解一点基础就不影响阅读.这里也写了个基本看ES Guice的使用方式. ES源码的版本是2.4.1版本.

    1 初始化TransportClient

    我们操作Elasticsearch时, 会首先创建Client类, 代码如下:

    private static Client getClient(){
            Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
                .put("client.transport.ping_timeout", "1200s").build();
            Client client = null;
            try {
                client = TransportClient.builder().settings(settings).build().addTransportAddress
                    (new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
            return client;
        }
    

    即实例化TransportClient对象,看看其源代码

    public TransportClient build() {
                //省略...
                final ThreadPool threadPool = new ThreadPool(settings);
                NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
    
                boolean success = false;
                try {
                    ModulesBuilder modules = new ModulesBuilder();
                    modules.add(new Version.Module(version));
                    // plugin modules must be added here, before others or we can get crazy injection errors...
                    for (Module pluginModule : pluginsService.nodeModules()) {
                        modules.add(pluginModule);
                    }
                    modules.add(new PluginsModule(pluginsService));
                    modules.add(new SettingsModule(this.settings));
                    modules.add(new NetworkModule(namedWriteableRegistry));
                    modules.add(new ClusterNameModule(this.settings));
                    modules.add(new ThreadPoolModule(threadPool));
                    modules.add(new TransportModule(this.settings, namedWriteableRegistry));
                    modules.add(new SearchModule() {
                        @Override
                        protected void configure() {
                            // noop
                        }
                    });
                    modules.add(new ActionModule(true));
                    modules.add(new ClientTransportModule(hostFailedListener));
                    modules.add(new CircuitBreakerModule(this.settings));
    
                    pluginsService.processModules(modules);
    
                    Injector injector = modules.createInjector();
                    final TransportService transportService = injector.getInstance(TransportService.class);
                    transportService.start();
                    transportService.acceptIncomingRequests();
    
                    TransportClient transportClient = new TransportClient(injector); // 初始化TransportClient对象
                    success = true;
                    return transportClient;
                } finally {
                    if (!success) {
                        ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
                    }
                }
            }
    

    可以知道代码里面是先通过Guice框架完成相关Module的依赖注入,比如你的增删改查Action实际上是ActionModule所包含的具体实现, 不同Module完成不同的工作.然后TransportClient transportClient = new TransportClient(injector)这个就是实例化TransportClient对象了, 来看看它的构造方法,

    private TransportClient(Injector injector) {
            super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
            this.injector = injector;
            nodesService = injector.getInstance(TransportClientNodesService.class);
            proxy = injector.getInstance(TransportProxyClient.class); // 代理类, 真正的完成execute工作的人
        }
    

    nodeService是节点操作类, proxy是代理类,是具体的执行execute方法操作的类.因此看看TransportProxyClient

    2 TransportProxyClient

    @Inject
        public TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, Map<String, GenericAction> actions) {
            this.nodesService = nodesService;
            MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
            for (GenericAction action : actions.values()) {
                if (action instanceof Action) {
                    actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
                }
            }
            this.proxies = actionsBuilder.immutableMap();
        }
    

    TransportProxyClient使用的是构造器注入, TransportProxyClient 此处是构造器注入, Settings是SettingsModule 绑定的, TransportService 是 TransportModule绑定的, TransportClientNodesService 是 ClientTransportModule绑定的, Map<String, GenericAction> actions是ActionModule绑定生成的. 其中里面的proxies变量比较重要,他其实是一个Map, key是Action, value是TransportActionNodeProxy, 那么这些Action是什么东西呢?
    我们知道一开始初始化TransportClient 的时候绑定了很多module其中有一个是ActionModule, 它的configure函数内容如下:

    protected void configure() {
    
            Multibinder<ActionFilter> actionFilterMultibinder = Multibinder.newSetBinder(binder(), ActionFilter.class);
            for (Class<? extends ActionFilter> actionFilter : actionFilters) {
                actionFilterMultibinder.addBinding().to(actionFilter);
            }
            bind(ActionFilters.class).asEagerSingleton();
            bind(AutoCreateIndex.class).asEagerSingleton();
            bind(DestructiveOperations.class).asEagerSingleton();
            registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
            registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
            registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
            registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
            registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
    
            registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
            registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
            registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
            registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
            registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
            registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
            registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
            registerAction(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
            registerAction(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
            registerAction(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
            registerAction(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
            registerAction(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
            registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
            registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
            registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
            registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
    
            registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
            registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
            registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
            registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
            registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
            registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
            registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
            registerAction(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
            registerAction(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
            registerAction(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
            registerAction(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
            registerAction(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class);
            registerAction(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
            registerAction(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
            registerAction(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
            registerAction(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
            registerAction(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
            registerAction(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
            registerAction(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
            registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
            registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
            registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
            registerAction(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
            registerAction(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
            registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
            registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
            registerAction(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
            registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
            registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class);
            registerAction(DeleteWarmerAction.INSTANCE, TransportDeleteWarmerAction.class);
            registerAction(GetWarmersAction.INSTANCE, TransportGetWarmersAction.class);
            registerAction(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
            registerAction(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
            registerAction(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);
    
            registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
            registerAction(GetAction.INSTANCE, TransportGetAction.class);
            registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class,
                    TransportDfsOnlyAction.class);
            registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
                    TransportShardMultiTermsVectorAction.class);
            registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);
            registerAction(ExistsAction.INSTANCE, TransportExistsAction.class);
            registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
            registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class);
            registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
                    TransportShardMultiGetAction.class);
            registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
                    TransportShardBulkAction.class);
            registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
            registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
            registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
            registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
            registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
            registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
            registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
            registerAction(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
            registerAction(RenderSearchTemplateAction.INSTANCE, TransportRenderSearchTemplateAction.class);
    
            //Indexed scripts
            registerAction(PutIndexedScriptAction.INSTANCE, TransportPutIndexedScriptAction.class);
            registerAction(GetIndexedScriptAction.INSTANCE, TransportGetIndexedScriptAction.class);
            registerAction(DeleteIndexedScriptAction.INSTANCE, TransportDeleteIndexedScriptAction.class);
    
            registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);
    
            // register Name -> GenericAction Map that can be injected to instances.
            MapBinder<String, GenericAction> actionsBinder
                    = MapBinder.newMapBinder(binder(), String.class, GenericAction.class);
    
            for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
                actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
            }
    //.......省略
    }
    
    public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction, Class... supportTransportActions) {
            actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
        }
    

    上面ActionModule 通过registerAction方法注册Action, 并加入到actions这个Map函数中, 然后通过MapBinder<String, GenericAction> 将数据注入其他实例,一次初始化TransportProxyClient的时候即可使用此MapBinder<String, GenericAction>

    // register Name -> GenericAction Map that can be injected to instances.
    MapBinder<String, GenericAction> actionsBinder
                    = MapBinder.newMapBinder(binder(), String.class, GenericAction.class);
    
            for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
                actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
            }
    

    3 Request操作流程

    经常会使用ES的增删改查以及Admin相关操作, 实际上最终都会经过代理类TransportProxyClient进行执行. 这里以Get操作来说明一下, 其他的Request类似.
    首先放一个Get的demo代码:

    public class App
    {
        public static void main(String[] args) {
            create();
        }
    
        public static void create(){
            Client client = getClient();
    
    
            getInfo(client);
            //getIndice(client);
            
        }
        private static void getInfo(Client client) {
            GetRequestBuilder getRequestBuilder = client.prepareGet("face_fixedperson", "Fixedperson", "126tc");
            GetResponse getFields = getRequestBuilder.execute().actionGet();
            String sourceAsString = getFields.getSourceAsString();
            System.out.println("-----" + sourceAsString);
        }
        private static void getIndice(Client client) {
            //Index index = new Index();
            AdminClient admin = client.admin();
            IndicesAdminClient indices = admin.indices();
    
            ListenableActionFuture<GetIndexResponse> execute = indices.prepareGetIndex().execute();
            GetIndexResponse getIndexResponse = execute.actionGet();
            String[] indices1 = getIndexResponse.getIndices();
            for (String in : indices1) {
                System.out.println(in);
            }
        }
    
        private static Client getClient(){
            Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
                .put("client.transport.ping_timeout", "1200s").build();
            Client client = null;
            try {
                client = TransportClient.builder().settings(settings).build().addTransportAddress
                    (new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
            return client;
        }
    }
    
    

    当执行getRequestBuilder.execute()的时候, 会先转到ActionRequestBuilder的

    public void execute(ActionListener<Response> listener) {
            client.execute(action, beforeExecute(request), listener);
        }
    

    然后转到了AbstractClient的

    public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
            headers.applyTo(request);
            listener = threadedWrapper.wrap(listener);
            doExecute(action, request, listener);
        }
    

    其中的doExecute实际上是TransportClient的doExecute, 其内部实际上就是通过TransportProxyClient来实现进行具体的execute操作的.

    @Override
        protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
            proxy.execute(action, request, listener);
        }
    

    继续看看TransportProxyClient的execute方法:

    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {
            final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
            nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
                @Override
                public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
                    proxy.execute(node, request, listener);
                }
            }, listener);
        }
    

    TransportActionNodeProxy<Request, Response> proxy = proxies.get(action); 这一步根据Action来获取具体的TransportActionNodeProxy类, 具体可参见TransportProxyClient的构造那部分.
    接着nodesService.execute方法实际上就是从可用的Node中随机选择出一个node,然后执行proxy.execute(node, request, listener)方法, 进而将请求发给此Node.

    大题来说, ES一开始初始化TransportClient的时候会绑定多种多样的Module, 然后在初始化TransportClient的时候同时会初始化代理类TransportProxyClient, 当通过TransportClient来提交Request请求的时候吧, 会交由代理类来真正的执行, 代理类会从可用节点中选择一个节点然后发送请求到此节点上. 这就是ES操作的前奏, 至于具体的操作细节则后面在写.

    相关文章

      网友评论

        本文标题:Elasticsearch 客户端源码操作流程 1

        本文链接:https://www.haomeiwen.com/subject/lcbojftx.html