美文网首页
Thrift服务端源码分析(FrameBuffer )

Thrift服务端源码分析(FrameBuffer )

作者: 番薯和米饭 | 来源:发表于2020-02-21 01:16 被阅读0次

    Thrift服务端在调用使用Selector(选择器)进行轮询的时候,遇到读事件,写事件的时候,通过调用FrameBuffer.read(),FrameBuffer.write()来实现业务调用,FrameBuffer封装于AbstractNonblockingServer中.我们来看下相应的源代码:

    /**
         * Possible states for the FrameBuffer state machine.
         * FrameBuffer有可能的所有状态
          */
        private enum FrameBufferState {
            // in the midst of reading the frame size off the wire
            READING_FRAME_SIZE,//读消息大小,是buffer的第一个状态,这步多半是读取消息头从而知道消息的大小长度
            // reading the actual frame data now, but not all the way done yet
            READING_FRAME,//读消息体
            // completely read the frame, so an invocation can now happen
            READ_FRAME_COMPLETE,//读完消息,下面可以执行相应的逻辑操作
            // waiting to get switched to listening for write events
            AWAITING_REGISTER_WRITE,//等待切换到监听写事件,一般表示服务端已经执行完相应的业务逻辑了
            // started writing response data, not fully complete yet
            WRITING,//写返回数据,给客户端相应
            // another thread wants this framebuffer to go back to reading
            AWAITING_REGISTER_READ,//等待注册读事件
            // we want our transport and selection key invalidated in the selector
            // thread
            AWAITING_CLOSE//等待关闭
        }
    

    1.先读消息头4字节,计算出消息体的长度 ,状态 READING_FRAME_SIZE -> READING_FRAME

    2.根据长度读消息体,读完之后执行invoke方法处理业务,状态 READING_FRAME -> READ_FRAME_COMPLETE

    3.调用responseReady()做好写数据准备,切换状态,根据状态改变Selector的注册事件,状态 AWAITING_REGISTER_WRITE -> WRITING

    4.收到读事件后会调用write方法,会把buffer中的数据返回, 状态 WRITING-> READING_FRAME_SIZE循环开始下一次等待客户端请求

    FrameBuffer 源代码比细节性的东西比较多,我这里暂时只是读懂了大概的整体流程,细节性的问题我以后在慢慢研究。

    public class FrameBuffer {
        private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
    
        // the actual transport hooked up to the client.
        // 与客户端连接的实际传输。
        protected final TNonblockingTransport trans_;
    
        // the SelectionKey that corresponds to our transport
        protected final SelectionKey selectionKey_;
    
        // the SelectThread that owns the registration of our transport
        protected final AbstractSelectThread selectThread_;
    
        // where in the process of reading/writing are we?
        // 第一步设置为读消息头,通过消息头知道消息长度
        protected FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
    
        // the ByteBuffer we'll be using to write and read, depending on the state
        protected ByteBuffer buffer_;
    
        protected final TByteArrayOutputStream response_;
    
        // the frame that the TTransport should wrap.
        protected final TMemoryInputTransport frameTrans_;
    
        // the transport that should be used to connect to clients
        // 传输层属性
        protected final TTransport inTrans_;
    
        protected final TTransport outTrans_;
    
        // 协议层属性
        // the input protocol to use on frames
        protected final TProtocol inProt_;
    
        // the output protocol to use on frames
        protected final TProtocol outProt_;
    
        // context associated with this connection
        protected final ServerContext context_;
    
        public FrameBuffer(final TNonblockingTransport trans,
            final SelectionKey selectionKey,
            final AbstractSelectThread selectThread) {
          trans_ = trans;
          selectionKey_ = selectionKey;
          selectThread_ = selectThread;
          buffer_ = ByteBuffer.allocate(4);
    
          frameTrans_ = new TMemoryInputTransport();
          response_ = new TByteArrayOutputStream();
          // 设置传输层协议
          inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
          outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
          // 设置处理层协议
          inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
          outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
    
          // 与事件驱动相关的,这个我还没弄懂
          if (eventHandler_ != null) {
            context_ = eventHandler_.createContext(inProt_, outProt_);
          } else {
            context_  = null;
          }
        }
    
        /**
         * Give this FrameBuffer a chance to read. The selector loop should have
         * received a read event for this FrameBuffer.
         *
         * @return true if the connection should live on, false if it should be
         *         closed
         */
        public boolean read() {
          // 这个函数就是用来读取客户端发来的数据,详细实现过程我还没仔细看
          // 但是大家可以通过FrameBufferState的状态来看懂大概流程
          if (state_ == FrameBufferState.READING_FRAME_SIZE) {
            // 读消息头,获取消息体的长度,也是buffer的第一个状态
            // try to read the frame size completely
            if (!internalRead()) {
              return false;
            }
    
            // if the frame size has been read completely, then prepare to read the
            // actual frame.
            if (buffer_.remaining() == 0) {
              // pull out the frame size as an integer.
              int frameSize = buffer_.getInt(0);
              if (frameSize <= 0) {
                LOGGER.error("Read an invalid frame size of " + frameSize
                    + ". Are you using TFramedTransport on the client side?");
                return false;
              }
    
              // if this frame will always be too large for this server, log the
              // error and close the connection.
              if (frameSize > MAX_READ_BUFFER_BYTES) {
                LOGGER.error("Read a frame size of " + frameSize
                    + ", which is bigger than the maximum allowable buffer size for ALL connections.");
                return false;
              }
    
              // if this frame will push us over the memory limit, then return.
              // with luck, more memory will free up the next time around.
              if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
                return true;
              }
    
              // increment the amount of memory allocated to read buffers
              readBufferBytesAllocated.addAndGet(frameSize + 4);
    
              // reallocate the readbuffer as a frame-sized buffer
              buffer_ = ByteBuffer.allocate(frameSize + 4);
              buffer_.putInt(frameSize);
              
              // 状态设置为FrameBufferState.READING_FRAME,表示开始读消息体
              state_ = FrameBufferState.READING_FRAME;
            } else {
              // this skips the check of READING_FRAME state below, since we can't
              // possibly go on to that state if there's data left to be read at
              // this one.
              return true;
            }
          }
    
          // it is possible to fall through from the READING_FRAME_SIZE section
          // to READING_FRAME if there's already some frame data available once
          // READING_FRAME_SIZE is complete.
    
          if (state_ == FrameBufferState.READING_FRAME) {
            // 通过上面代码已经将buffer_的大小设置为消息体 的大小,这次读取整个消息体。
            if (!internalRead()) {
              return false;
            }
    
            // since we're already in the select loop here for sure, we can just
            // modify our selection key directly.
            if (buffer_.remaining() == 0) {
              // get rid of the read select interests
              selectionKey_.interestOps(0);
              // 读完消息,return 以后 下面可以执行操作
              state_ = FrameBufferState.READ_FRAME_COMPLETE;
            }
    
            return true;
          }
    
          // if we fall through to this point, then the state must be invalid.
          LOGGER.error("Read was called but state is invalid (" + state_ + ")");
          return false;
        }
    
        /**
         * Give this FrameBuffer a chance to write its output to the final client.
         */
        public boolean write() {
          if (state_ == FrameBufferState.WRITING) {
            try {
              if (trans_.write(buffer_) < 0) {
                return false;
              }
            } catch (IOException e) {
              LOGGER.warn("Got an IOException during write!", e);
              return false;
            }
    
            // we're done writing. now we need to switch back to reading.
            if (buffer_.remaining() == 0) {
              prepareRead();
            }
            return true;
          }
    
          LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
          return false;
        }
    
        /**
         * Give this FrameBuffer a chance to set its interest to write, once data
         * has come in.
         */
         /**
         * 这个函数就是根据FrameBuffer.state_的状态选择下一步干啥
         */
        public void changeSelectInterests() {
          switch (state_) {
            // 是等待切换到监听写事件,设置为SelectionKey.OP_WRITE,往客户端写数据
          case AWAITING_REGISTER_WRITE:
            // set the OP_WRITE interest
            selectionKey_.interestOps(SelectionKey.OP_WRITE);
            state_ = FrameBufferState.WRITING;
            break;
            // 等待注册读事件,调用prepareRead()方法接受客户端下一次的请求
          case AWAITING_REGISTER_READ:
            prepareRead();
            break;
            // 等待关闭状态 关闭FrameBuffer
          case AWAITING_CLOSE:
            close();
            selectionKey_.cancel();
            break;
          default:
            LOGGER.error(
                "changeSelectInterest was called, but state is invalid ({})",
                state_);
          }
        }
    
        /**
         * Shut the connection down.
         */
        public void close() {
          // if we're being closed due to an error, we might have allocated a
          // buffer that we need to subtract for our memory accounting.
          if (state_ == FrameBufferState.READING_FRAME ||
              state_ == FrameBufferState.READ_FRAME_COMPLETE ||
              state_ == FrameBufferState.AWAITING_CLOSE) {
            readBufferBytesAllocated.addAndGet(-buffer_.array().length);
          }
          trans_.close();
          if (eventHandler_ != null) {
            eventHandler_.deleteContext(context_, inProt_, outProt_);
          }
        }
    
        /**
         * Check if this FrameBuffer has a full frame read.
         */
        public boolean isFrameFullyRead() {
          return state_ == FrameBufferState.READ_FRAME_COMPLETE;
        }
    
        /**
         * After the processor has processed the invocation, whatever thread is
         * managing invocations should call this method on this FrameBuffer so we
         * know it's time to start trying to write again. Also, if it turns out that
         * there actually isn't any data in the response buffer, we'll skip trying
         * to write and instead go back to reading.
         */
       /**
         * 看函数名,就是准备回应,调用这个函数一般是客户端rpc请求调用服务端代码,
         * 服务端业务逻辑处理完后开始回应客户端,将结果返回过去,如果服务端的业务逻辑
         * 处理失败,回应的buffer为空,这次将不会回应数据,开始下一次的读等待
         */
        public void responseReady() {
          // the read buffer is definitely no longer in use, so we will decrement
          // our read buffer count. we do this here as well as in close because
          // we'd like to free this read memory up as quickly as possible for other
          // clients.
          //貌似通过这个方法快速释放读buffer里面的空间
          readBufferBytesAllocated.addAndGet(-buffer_.array().length);
    
          if (response_.len() == 0) {
            // go straight to reading again. this was probably an oneway method
            // 如果回应的数据为空,则设置状态为等待注册读事件
            state_ = FrameBufferState.AWAITING_REGISTER_READ;
            buffer_ = null;
          } else {
            // 调用成功,有返回数据
            buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
    
            // set state that we're waiting to be switched to write. we do this
            // asynchronously through requestSelectInterestChange() because there is
            // a possibility that we're not in the main thread, and thus currently
            // blocked in select(). (this functionality is in place for the sake of
            // the HsHa server.)
            /**
             * 设置状态,我们正在等待被切换到写入。我们通过requestSelectInterestChange()异步执行此操作,
             * 因为有可能我们不在主线程中,因此当前阻塞在select()中。(这个方法功能是为了HsHa服务器而准备的。)
             */
            // 设置为等待切换到监听写事件 表示下一步开始要往客户端写数据了
            state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
          }
          requestSelectInterestChange();
        }
        /**
         * Actually invoke the method signified by this FrameBuffer.
         * invoke()函数开始进行服务端的业务处理
         */
        public void invoke() {
          frameTrans_.reset(buffer_.array());
          response_.reset();
    
          try {
            if (eventHandler_ != null) {
              eventHandler_.processContext(context_, inTrans_, outTrans_);
            }
            // 开始调用服务端业务函数进行业务处理
            processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
            responseReady();
            return;
          } catch (TException te) {
            LOGGER.warn("Exception while invoking!", te);
          } catch (Throwable t) {
            LOGGER.error("Unexpected throwable while invoking!", t);
          }
          // This will only be reached when there is a throwable.
          state_ = FrameBufferState.AWAITING_CLOSE;
          requestSelectInterestChange();
        }
    
        /**
         * Perform a read into buffer.
         *
         * @return true if the read succeeded, false if there was an error or the
         *         connection closed.
         */
        private boolean internalRead() {
          try {
            if (trans_.read(buffer_) < 0) {
              return false;
            }
            return true;
          } catch (IOException e) {
            LOGGER.warn("Got an IOException in internalRead!", e);
            return false;
          }
        }
    
        /**
         * We're done writing, so reset our interest ops and change state
         * accordingly.
         */
         /**
         * 完成了写状态,重新设置为读状态,等待下一次客户端的请求
         */
        private void prepareRead() {
          // we can set our interest directly without using the queue because
          // we're in the select thread.
          selectionKey_.interestOps(SelectionKey.OP_READ);
          // get ready for another go-around
          buffer_ = ByteBuffer.allocate(4);
          //设置为开始下次读消息头,获取表示消息体的长度,也是buffer的第一个状态
          state_ = FrameBufferState.READING_FRAME_SIZE;
        }
    
        /**
         * When this FrameBuffer needs to change its select interests and execution
         * might not be in its select thread, then this method will make sure the
         * interest change gets done when the select thread wakes back up. When the
         * current thread is this FrameBuffer's select thread, then it just does the
         * interest change immediately.
         */
     /**
         * 当这个FrameBuffer需要更改它的select感兴趣的状态,执行的线程可能不是SelectThread,
         * 那么这个方法将确保在SelectThread线程恢复时完成感兴趣的状态更改。如果当当前线程是
         * 这个FrameBuffer的SelectThread时,它只会立即改变感兴趣的状态。(这个方法功能是为了HsHa服务器而准备的。)
         */
        protected void requestSelectInterestChange() {
          if (Thread.currentThread() == this.selectThread_) {
            changeSelectInterests();
          } else {
            // 如果不是selectThread,则调用requestSelectInterestChange()方法
            // 将FrameBuffer放到SelectThread里面的一个Set里面,这个Set声明在AbstractSelectThread类里
            this.selectThread_.requestSelectInterestChange(this);
          }
        }
      } // FrameBuffer
    

    相关文章

      网友评论

          本文标题:Thrift服务端源码分析(FrameBuffer )

          本文链接:https://www.haomeiwen.com/subject/ggubfhtx.html