美文网首页elasticsearch源码分析
Elasticsearch源码分析-架构设计之Action

Elasticsearch源码分析-架构设计之Action

作者: 尹亮_36cd | 来源:发表于2019-04-01 15:00 被阅读0次

    1 请求URL 与对应Action

    如果我们发送如下的http请求,elasticsearch是如何匹配对应Action的呢?

    curl -XGET 'http://elasticsearch_ip:port/xxx/url/path' -d '{
        "请求的content"
    }'
    

    在org.elasticsearch.rest.action包下面,存放了处理各种请求的rest action类
    在每个action的构造方法中,会将当前action对象注册到指定的url上

    public class RestXxxAction extends BaseRestHandler {
    
        @Inject
        public RestXxxAction(Settings settings, RestController controller, Client client) {
            super(settings, controller, client);
            controller.registerHandler(GET, "/xxx/url/path", this);
            controller.registerHandler(POST, "/xxx/url/path", this);
        }
    }
    

    在registerHandler()方法中,会将url path和对应handler添加到http method对象上

    public class RestController extends AbstractLifecycleComponent<RestController> {
        public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
            switch (method) {
                case GET:
                    getHandlers.insert(path, handler);
                    break;
                case DELETE:
                    deleteHandlers.insert(path, handler);
                    break;
                case POST:
                    postHandlers.insert(path, handler);
                    break;
                case PUT:
                    putHandlers.insert(path, handler);
                    break;
                case OPTIONS:
                    optionsHandlers.insert(path, handler);
                    break;
                case HEAD:
                    headHandlers.insert(path, handler);
                    break;
                default:
                    throw new ElasticsearchIllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
            }
        }
    }
    

    elasticsearch使用HttpRequestHandler接收http请求消息,最终会在executeHandler()方法中调用getHandler()方法获取path对应的handler

    public class RestController extends AbstractLifecycleComponent<RestController> {
        void executeHandler(RestRequest request, RestChannel channel) throws Exception {
            final RestHandler handler = getHandler(request);
            if (handler != null) {
                handler.handleRequest(request, channel);
            } else {
                if (request.method() == RestRequest.Method.OPTIONS) {
                    channel.sendResponse(new BytesRestResponse(OK));
                } else {
                    channel.sendResponse(new BytesRestResponse(BAD_REQUEST, 
      "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
                }
            }
        }
    }
    

    在getHandler()方法中,主要是根据method和path获取在RestXxxAction中添加的handler

    public class RestController extends AbstractLifecycleComponent<RestController> {
        private RestHandler getHandler(RestRequest request) {
            String path = getPath(request);
            RestRequest.Method method = request.method();
            if (method == RestRequest.Method.GET) {
                return getHandlers.retrieve(path, request.params());
            } else if (method == RestRequest.Method.POST) {
                return postHandlers.retrieve(path, request.params());
            } else if (method == RestRequest.Method.PUT) {
                return putHandlers.retrieve(path, request.params());
            } else if (method == RestRequest.Method.DELETE) {
                return deleteHandlers.retrieve(path, request.params());
            } else if (method == RestRequest.Method.HEAD) {
                return headHandlers.retrieve(path, request.params());
            } else if (method == RestRequest.Method.OPTIONS) {
                return optionsHandlers.retrieve(path, request.params());
            } else {
                return null;
            }
        }
    }
    

    elasticsearch在path对应获取Handler后,就执行该handler的handleRequest()方法,作为rest逻辑入口

    2 Action与对应TransportAction

    在ActionModule中,会将Action和对应的TransportAction注册到actions对象中

    public class ActionModule extends AbstractModule {
        @Override
        protected void configure() {
            registerAction(XxxAction.INSTANCE, TransportXxxAction.class);
        }
    
        public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(
                            GenericAction<Request, 
                            Response> action, Class<? extends TransportAction<Request, Response>> transportAction, 
                            Class... supportTransportActions) {
            actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
        }
    }
    

    在RestXxxAction中,最终会调用AbstractClient的xxx方法,并传入Action参数为XxxAction.INSTANCE,即为在ActionModule中注册的Action类

    public abstract class AbstractClient implements Client {
        @Override
        public void xxx(final SuggestRequest request, final ActionListener<SuggestResponse> listener) {
            execute(XxxAction.INSTANCE, request, listener);
        }
    }
    

    在NodeClient中会根据传入的Action参数从actions获取在ActionModule中注册的TransportAction,并执行其execute()方法

    public class NodeClient extends AbstractClient {
        @Override
        public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
            headers.applyTo(request);
            TransportAction<Request, Response> transportAction = actions.get((ClientAction)action); 
            transportAction.execute(request, listener);
        }
    }
    

    在TransportAction类图个execute()方法中可以看出:
    ① TransportXxxAction都继承自TransportAction类
    ② TransportAction的execute()方法仅调用了doExecute()方法
    ④ TransportXxxAction重写了TransportAction的doExecute()方法
    因此,在最终的对应关系为在ActionModule中注册的Action调用的为TransportXxxAction的doExecute()方法

    TransportAction类图

    TransportAction的execute()方法仅调用了需要子类重写的抽象方法doExecute()

    public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
        
        protected abstract void doExecute(Request request, ActionListener<Response> listener);
    
        public final void execute(Request request, ActionListener<Response> listener) {
            // ...
            if (filters.length == 0) {
                try {
                    // TransportAction 子类都要重写这个方法
                    doExecute(request, listener);
                } catch(Throwable t) {
                    logger.trace("Error during transport action execution.", t);
                    listener.onFailure(t);
                }
            } else {
                RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
                requestFilterChain.proceed(actionName, request, listener);
            }
        }
    }
    

    3 节点间用Action匹配Handler

    如果elasticsearch要请求的节点不是当前节点,则需要将请求发送到对应的节点上执行
    在TransportXxxAction中会将ACTION和对应的Handler注册到对应的serverHandlers对象中,在使用transportService发送请求后,MessageChannelHandler.messageReceived()会接受到信息,然后在handleRequest()方法中获取注册的Handler,执行其messageReceived()方法或者交给线程池处理

    public class TransportXxxAction {
        private final TransportService transportService;
        private final SearchService searchService;
    
        @Inject
        public TransportXxxAction(Settings settings, 
                            ThreadPool threadPool, 
                            TransportService transportService, 
                            ClusterService clusterService, 
                            XxxService xxxService) {
            this.transportService = transportService;
            transportService.registerHandler(XXX_ACTION_NAME, new XxxTransportHandler());
            this.xxxService= xxxService;
        }
    
        public void executeXxx(DiscoveryNode node, final XxxTransportRequest request, final XxxListener listener) {
            // 如果shard所在的节点id和当前节点id相同
            if (clusterService.state().nodes().localNodeId().equals(node.id())) {
                // 当前节点执行逻辑
                XxxService.execute(request);
            } else {
                // shard 所在节点不是当前节点, 发送请求执行远程节点搜索
                // node是要发送的节点
                transportService.sendRequest(node, XXX_ACTION_NAME, request, new BaseTransportResponseHandler() {
                    @Override
                    public QueryXxxResult newInstance() {
                        return new QueryXxxResult();
                    }
                    @Override
                    public void handleResponse(XxxProvider response) {
                        listener.onResult(response);
                    }
                    @Override
                    public void handleException(TransportException exp) {
                        listener.onFailure(exp);
                    }
                    @Override
                    public String executor() {
                        return ThreadPool.Names.SAME;
                    }
                });
            }
        }
    
        private class XxxTransportHandler extends BaseTransportRequestHandler {
            /**
             * 接收远程端口的tcp 请求, 执行Xxx 逻辑
             * @param request   TransportRequest
             * @param channel   TransportChannel
             * @throws Exception    Exception
             */
            @Override
            public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
                // 远程节点待执行逻辑
                XxxProvider result = XxxService.execute(request);
                // 将Query结果响应发送给调用节点
                channel.sendResponse(result);
            }
    
            @Override
            public String executor() {
                return ThreadPool.Names.Xxx;
            }
        }
    }
    

    在transportServiceAdapter.handler()方法中,即根据注册的action获取对应的TransportHandler,如果executor是same则执行handler的messageReceived()方法,否则交给线程池执行

    public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
        protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
            final String action = buffer.readString();
            transportServiceAdapter.onRequestReceived(requestId, action);
            final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
            try {
                final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);
                if (handler == null) {
                    throw new ActionNotFoundTransportException(action);
                }
                final TransportRequest request = handler.newInstance();
                request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
                request.readFrom(buffer);
                if (ThreadPool.Names.SAME.equals(handler.executor())) {
                    //noinspection unchecked
                    handler.messageReceived(request, transportChannel);
                } else {
                    threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
                }
            } catch (Throwable e) {
                try {
                    transportChannel.sendResponse(e);
                } catch (IOException e1) {
                    logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                    logger.warn("Actual Exception", e1);
                }
            }
            return action;
        }
    }
    

    下面讲另外一种情况,在ActionModule中注册的Action也都会注册到handler中

    public class TransportXxxAction extends HandledTransportAction {
        @Inject
        public TransportXxxAction (Settings settings, ThreadPool threadPool) {
            super(settings, XxxAction.NAME, threadPool);
    }
    

    在TransportAction类中同样使用transportService.registerHandler()方法注册handler

    public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request,Response>{
        protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters){
            super(settings, actionName, threadPool, actionFilters);
            transportService.registerHandler(actionName, new TransportHandler() {
                @Override
                public Request newInstance(){
                    return newRequestInstance();
                }
            });
        }
    }
    

    因此在InternalTransportClient中,主要是回调proxy.execute()

    public class InternalTransportClient extends AbstractClient {
       @Inject
        public InternalTransportClient(
                                       TransportClientNodesService nodesService, 
                                       InternalTransportAdminClient,
                                       Map<String, GenericAction> actions, 
                                       Headers headers) {
            MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
            for (GenericAction action : actions.values()) {
                if (action instanceof Action) {
                    actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
                }
            }
            this.actions = actionsBuilder.immutableMap();
        }
        @Override
        public void execute(final Action action, final Request request, ActionListener listener) {
            headers.applyTo(request);
            final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
            nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
                @Override
                public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
                    proxy.execute(node, request, listener);
                }
            }, listener);
        }
    }
    

    transportService.sendRequest()中的action.name()即为Action的Name参数,已被注册到对应的handler中了

    public class TransportActionNodeProxy extends AbstractComponent {
        public void execute(DiscoveryNode node, 
                                final Request request, 
                                final ActionListener<Response> listener) {
            ActionRequestValidationException validationException = request.validate();
            if (validationException != null) {
                listener.onFailure(validationException);
                return;
            }
            transportService.sendRequest(node, action.name(), 
                                request, transportOptions, 
                                new BaseTransportResponseHandler<Response>() {
                @Override
                public Response newInstance() {
                    return action.newResponse();
                }
    
                @Override
                public String executor() {
                    if (request.listenerThreaded()) {
                        return ThreadPool.Names.LISTENER;
                    }
                    return ThreadPool.Names.SAME;
                }
    
                @Override
                public void handleResponse(Response response) {
                    listener.onResponse(response);
                }
    
                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure(exp);
                }
            });
        }
    }
    

    相关文章

      网友评论

        本文标题:Elasticsearch源码分析-架构设计之Action

        本文链接:https://www.haomeiwen.com/subject/sglrbqtx.html