美文网首页
ElasticSearch数据传输机制

ElasticSearch数据传输机制

作者: 报纸团 | 来源:发表于2017-07-10 21:38 被阅读0次

    ElasticSearch的数据传输服务TransportService

    ElasticSearch的数据传输服务是在TransportService类中实现的。TransportService的核心方法是sendRequest,如下图所示:

    sendRequest方法

    从上面的代码段可以看出几个有用的信息:

    1. 首先看这一句:
    生成requestId

    这一句表明,每个被传输的请求,均包含一个requestId,根据requestId的生成函数newRequestId()的实现来看,requestId实际是一个自增的long型变量。这一变量的作用就是为了标识每一次请求。在接收方处理完请求,并返回应答时,需要将请求的requestId带回,以便发送方收到应答后,能够确定是对哪次请求的应答。

    1. 再来看这一句:
    响应回调句柄对象

    在发送传输请求时,同时指定了返回数据的回调句柄对象:TransportResponseHandler<T> hander。这一回调句柄对象被注册到clientHandlers容器中。
    clientHandlers可以看做是一个Map,map的key值是requestId,map的value值是对应的TransportResponseHandler。可以通过clientHandlers.get(requestId)这样的调用来获取到对应的ResponseHandler。

    1. 最后看这一段
    调用不同方法发送Request

    在发送请求的时候,需要指定发送的目标节点。如果目标节点是本机,直接调用sendLocalRequest方法即可,这一方法不需要通过网络协议进行传输。如果目标节点不是本机,则调用transport成员的sendRequest方法实现数据发送。

    Transport组件

    ElasticSearch的节点间数据传输组件被抽象成Transport接口。并通过构造函数注入的方式注入到TransportService对象中,如下图所示:

    将transport对象注入到TransportService对象中

    可以看到,transport对象和threadPool对象都是通过构造函数注入的方式注入到TransportService中的。
    ,实际上,Transport这一接口的实现类仅有NettyTransport这一个。所以,可以认为ElasticSearch的节点间通讯就是通过Netty来实现的。

    NettyTransport

    由于Netty基于拦截器模式实现的NIO通讯框架,因此Netty的响应处理机制要通过如下代码说明:

    Netty的ChannelPipeline设置

    从上图的代码可以看出,ServerChannelPipelineFactory在pipeline上主要添加了两个Handler,一个是SizeHeaderFrameDecoder,一个是MessageChannelHandler。

    SizeHeaderFrameDecoder

    SizeHeaderFrameDecoder在ChannelPipeline中被命名为“size”,考虑到Netty本身也内置一个类似SizeHeaderFrameDecoder的Decoder,因此,很自然的理解为该Decoder是负责通过一个数据包长度的字段来指示包的长度的。而实际上,elasticsearch的SizeHeaderFrameDecoder的功能远比简单的一个包长度复杂,Netty的数据包头也不仅是一个包长度信息。下面详细介绍一下Netty数据包的包头数据结构。

    Netty数据包头格式

    NettyHeader

    NettyHeader数据的格式如下:

    字段名称| 字段长度(字节)| 说明|备注
    ----|------|----
    MARKER_BYTES_SIZE| 2 | 起始标识 | “ES”两个大写字母
    MESSAGE_LENGTH_SIZE| 4 | 消息长度 | int型变量
    REQUEST_ID_SIZE| 8 | 消息ID | long型变量,请求发起方自增生成
    STATUS_SIZE | 1 |状态变量|消息的flag集合,下面详细说明
    VERSION_ID_SIZE| 4 | 版本信息 |

    STATUS字段

    NettyHeader中的Status字段的意义在TransportStatus类中定义。STATUS字段主要包含三个标志位:

    • STATUS_REQRES
    • STATUS_ERROR
    • STATUS_COMPRESS

    TransportStatus的代码如下所示:

    TransportStatus.java

    下图图示中列出了STATUS字段(单字节)各个标识位的位置和意义。可以看出,只有后三位是有意义的。

    7|6|5|4|3|3|2|1|0
    ----|----|----|----|----|----|----|----
    -|-|-|-|-|-|压缩标识|Error标识|response标识(request为0,response为1)

    MessageChannelHandler

    MessageChannelHandler在ChannelPipeline中被命名为“dispatcher”,这说明该Decoder负责决定接收到的数据包该交给那个具体的业务逻辑去处理。在MessageChannelHandler的业务逻辑中,如下三个成员起了重要作用:

    • transport
    • threadPool
    • transportServiceAdapter

    这三个成员是通过构造函数传入的,如下图所示:

    MessageChannelHandler的构造函数

    从上面的代码可以看出来,threadPool和transportServiceAdapter均来自于transport对象,因此,对于MessageChannelHandler来说,transport是至关重要的。

    transportServiceAdapter

    MessageChannelHandler的核心逻辑从messageReceived方法展开。但是,在进入messageReceived方法之前,我想先介绍一下transportServiceAdapter。这是后面关于messageReceived方法相关逻辑中需要涉及到的一个重要成员变量。

    TransportServiceAdapter接口和TransportService类

    transportServiceAdapter成员是TransportServiceAdapter接口的一个实现类,该接口的代码如下所示:

    TransportServiceAdapter接口

    该接口只有一个实现类,即TransportService.Adapter。其实,TransportServiceAdapter虽然命名为Adapter,但是,它的设计原意可能更接近门面模式。因为目标是使用一个更简单的接口来调用TransportService。TransportServiceAdapter接口有两个主要的获取消息处理句柄的方法,分别是:

    • onResponseReceived
    • getRequestHandler
      下面针对这两个函数,来看一下TransportService.Adapter的代码。
    TransportService.Adapter.onResponseReceived
    TransportService.Adapter.onResponseReceived

    从代码中可以看到,这部分代码的主要逻辑是从clientHandlers容器中,获取到response的处理句柄——ResponseHandler。关于clientHandlers,之前在介绍TransportService.sendRequest方法时,介绍过了。下面结合此部分代码,重新回忆和梳理一下request的发送和响应流程:

    1. 发送方构建Request,在提交Request的同时,还需要提供responseHandler的响应信息回调处理句柄对象。
    2. 发送方将构建的Request对象和responseHandler句柄传递给TransportService的sendRequest方法。
    3. TransportService的sendRequest方法首先给request分配一个requestId,然后将requestId和responseHandler已key-value对的方式存储到clientHandlers容器中。随后,sendRequest调用transport成员变量的sendRequest方法执行数据发送操作。
    4. 接收方接收到request,进行处理,并返回response。(这部分操作在下面会进一步描述)。然后通过请求的通道(channel)将response返回。
    5. 发送方通过Netty框架完成接收数据包的处理,根据数据包的status字段,判断这是一个Response,然后调用MessageChannelHandler的相应函数进行处理。MessageChannelHandler最终通过调用TransportService.Adapter.onResponseReceived方法在TransportService的clientHandlers中根据requestId查找到该response对应的handler处理句柄对象。
    6. 调用handler的handleResponse方法进行返回结果的处理。根据handler的执行线程选择,可能在数据接收线程里面直接进行处理,也可能在线程池调用线程进行处理。
      下面,首先对上述第5步的数据包接收处理过程进行详细描述。
    TransportService.Adapter.getRequestHandler方法
    TransportService.Adapter.getRequestHandler

    代码很简单,就是直接调用requestHandlers的get方法。requestHandlers也是个map,key值是action,value值是RequestHandlerRegistry,这个Registry中包含相应消息的hander句柄对象。

    那么requestHandlers是由谁构建的呢,这个requestHandler是在系统启动时,由各个消息相应的Action对象通过调用registerRequestHandler方法,注册到TransportService中的。整个ElasticSearch各个模块,其中大量功能是需要用到节点间通讯的。因此,ElasticSearch各个模块均会调用TransportService的registerRequestHandler方法。下面以SearchServiceTransportAction为例进行说明,代码如下:

    SearchServiceTransportAction

    从代码中可以看到,SearchServiceTransportAction中注册了大量不同类型的Request的处理句柄。

    MessageChannelHandler的messageReceived方法

    MessageChannelHandler作为Netty的Decoder的实现类。需要重载messageReceived方法。在该方法中,根据消息的status信息,来决定如何对消息进行处理。具体代码如下:

    MessageChannelHandler的消息分发逻辑

    可以看到,实际处理消息内容的函数有如下几个:

    • handleRequest
    • handlerResponseError
    • handleResponse
      以上这三个函数均是MessageChannelHandler的proteted方法。根据上面的业务逻辑,数据包的status标志位中,只有response才会出现error的情况。

    handleRequest函数

    handleRequest函数的代码如下图所示:

    MessageChannelHandler.handleRequest

    上述代码主要有三个需要注意的地方,已经在上面的代码中通过红色方框标出

    获取requestHandler

    通过调用transportServiceAdapter.getRequestHandler方法实现,这部分代码在前面介绍transportServiceAdpater成员变量的时候,已经进行了较为详细的说明。

    执行request的消息处理函数

    从代码上看,是根据request的处理句柄对象的执行方式设定来决定是在当前线程(Netty的消息处理线程)中进行消息处理还是在特定的线程池中完成消息处理。

    异常信息返回

    如果在request消息处理过程中发生异常,则调用transportChannel.sendResponse(Throwable e)方法将错误信息返回给request请求节点。

    handleResponse函数

    handleResponse函数的代码如下图所示:

    MessageChannelHandler.handleResponse

    handleResponse方法中并无特殊需要注意的代码。大致逻辑与handleRequest相同。

    相关文章

      网友评论

          本文标题:ElasticSearch数据传输机制

          本文链接:https://www.haomeiwen.com/subject/jstuhxtx.html