美文网首页
Hadoop RPC源码分析(一)

Hadoop RPC源码分析(一)

作者: Bottle丶Fish | 来源:发表于2018-02-04 22:41 被阅读65次

    ipc.Server 类分析

    Hadoop采用了Master/Slave结构。其中,Master通过ipc.Server接收并处理所有Slave发送的请求,这就要求ipc.Server将高并发和可扩展性作为设计目标。为此,ipc.Server采用了很多具有提高并发处理能力的技术,主要包括线程池、事件驱动Reactor设计模式等。这些技术采用了JDK自带的库实现。我们先重点分析它是如何利用Reactor设计模式提高整体性能的。

    01 Reactor设计模式

    Reactor是并发编程中的一种基于事件驱动的设计模式。它具有以下2个特点:

    • 通过派发/分离IO操作时间提高系统的并发性能
    • 提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理

    一个典型的Reactor模式中主要包括以下几个角色:

    • Reactor:IO事件的派发者
    • Acceptor:接收来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler
    • Handler:与一个Client通信的实体,并按一定的过程实现业务的处理。Handler内部往往会有更进一步的层次划分,用来抽象诸如read,decode,compute,encode和send等过程。在Reactor模式中,业务逻辑被分散的IO事件所打破,所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次IO事件到来的时候(另一半可读了)能继续上次中断的处理。
    • Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程池中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理。

    ipc.Server实现了一个典型的Reactor设计模式,其整体架构与上述完全一致。了解了Reactor的架构之后,能够帮助理解和学习ipc.Server的设计思路及实现。下面就分析Ipc.Server的实现细节。

    02 ipc.Server实现细节

    用eclipse打开已经编译好的源码,找到ipc.Server,使用eclipse的quick outline查看一下该类的大致结构。


    ipc.Server outline

    源码内容很多,要先找到下手的地方,通过outline可以捕获到Server有几个内部类,这几个类是什么作用这是需要关心的,然后就是程序的入口,这个start()方法。先看看start()做了什么。

    启动服务
    Server.start()

      public synchronized void start() {
        responder.start();
        listener.start();
        handlers = new Handler[handlerCount];
        
        for (int i = 0; i < handlerCount; i++) {
          handlers[i] = new Handler(i);
          handlers[i].start();
        }
      }
    

    start()方法启动了几个对象,通过名称可以知道他们是几个内部类的实例,那下一步就应该分析一下每个类的作用。大致查看一下这几个类,发现都继承自Thread类,也就是说每个类都启动了一个新的线程,那么重点就是去考察这个几个线程主体干了什么。

    Responder、Listener和Handler
    1.Responder

    private class Responder extends Thread {
         // 代码.... 此处省略
        @Override
        public void run() {
         // 代码.... 此处省略
          while (running) {
            try {
              writeSelector.select(PURGE_INTERVAL);
              Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                try {
                  if (key.isValid() && key.isWritable()) {
                      doAsyncWrite(key);
                  }
                } catch (IOException e) {
                 // 代码.... 此处省略
                }
              }  
            } 
         // 代码.... 此处省略
        }
    }
    
    
    1. Listener
      private class Listener extends Thread {
         // 代码.... 此处省略
        @Override
        public void run() {
          // 代码.... 此处省略
          while (running) {
            SelectionKey key = null;
            try {
              selector.select();
              Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
              while (iter.hasNext()) {
                key = iter.next();
                iter.remove();
                try {
                  if (key.isValid()) {
                    if (key.isAcceptable())
                      doAccept(key);
                  }
                } catch (IOException e) {
                }
                key = null;
              }
            } catch (OutOfMemoryError e) {
              // 代码.... 此处省略
            } catch (Exception e) {
              // 代码.... 此处省略
            }
            // 代码.... 此处省略
          }
         // 代码.... 此处省略
      }
    
    1. Handler
      private class Handler extends Thread {
    // 代码.... 此处省略
        @Override
        public void run() {
         // 代码.... 此处省略
          while (running) {
            try {
              final Call call = callQueue.take(); // pop the queue; maybe blocked here
               // 代码.... 此处省略
              try { 
                if (call.connection.user == null) {
                  value = call(call.connection.protocol, call.param, 
                               call.timestamp);
                } else {
                }
              } catch (Throwable e) {
                // 代码.... 此处省略
              }
              // 代码.... 此处省略
              synchronized (call.connection.responseQueue) {           
                // 代码.... 此处省略
                setupResponse(buf, call, 
                            (error == null) ? Status.SUCCESS : Status.ERROR, 
                            value, errorClass, error);
                responder.doRespond(call);
              }
            } catch (InterruptedException e) {
              // 代码.... 此处省略
            } catch (Exception e) {
              // 代码.... 此处省略
            }
          }
          // 代码.... 此处省略
        }
    
      }
    

    纵观这几个线程的主体,发现Responder和Listener的代码很熟悉,NIO里的知识。Listener负责监听op_accept事件,然后调用doAccept()方法处理连接;Responder负责监听op_write事件,然后调用doAsyncWrite()方法;Handler里只能大致知道调用了Server.call()这个抽象方法(应该会在某个地方实现)得到了value,然后setupResponse()把处理结果关联到Call,再用responder.doRespond()向客户端做出回应,至于Call,Connection,这也正是我们还没有弄清楚的几个类;还有Call是从一个叫做callQueue的变量里拿到的,这个变量也成为了我们进一步需要关心的地方。

    目前能知道的就是:Listener是监听连接的,但对连接是如何处理的还需要解读doRead()方法;Handler是处理业务逻辑的,起点是存放在callQueue中的Call,Call又与Connection联系密切,但这2个类的作用还未知,处理完之后调用responder.doRespond()做出回应,不过Responder功能不仅仅如此,还负责doAsyncWrite()。

    所以,接下来的任务是分析一下Call类、Connection类、callQueue变量、doAccept()方法、doRespond()方法和doAsyncWrite()方法。

    Call、Connection、callQueue、doAccept()、doRespond()、doAsyncWrite()

    1. Call类
      /** A call queued for handling. */
      private static class Call {
        private int id;                               // the client's call id
        private Writable param;                       // the parameter passed
        private Connection connection;                // connection to client
        private long timestamp;     // the time received when response is null
                                       // the time served when response is not null
        private ByteBuffer response;                      // the response for this call
    
        public Call(int id, Writable param, Connection connection) { 
          this.id = id;
          this.param = param;
          this.connection = connection;
          this.timestamp = System.currentTimeMillis();
          this.response = null;
        }
        
        @Override
        public String toString() {
          return param.toString() + " from " + connection.toString();
        }
    
        public void setResponse(ByteBuffer response) {
          this.response = response;
        }
      }
    
    1. Connection类
      /** Reads calls from a connection and queues them for handling. */
      public class Connection {
          // 代码.... 此处省略
        public Connection(SelectionKey key, SocketChannel channel, 
                          long lastContact) {
          this.channel = channel;
          this.lastContact = lastContact;
          this.data = null;
          this.dataLengthBuffer = ByteBuffer.allocate(4);
          this.unwrappedData = null;
          this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
          this.socket = channel.socket();
          this.addr = socket.getInetAddress();
          if (addr == null) {
            this.hostAddress = "*Unknown*";
          } else {
            this.hostAddress = addr.getHostAddress();
          }
          this.remotePort = socket.getPort();
          this.responseQueue = new LinkedList<Call>();
          if (socketSendBufferSize != 0) {
            try {
              socket.setSendBufferSize(socketSendBufferSize);
            } catch (IOException e) {
            }
          }
        } 
      }
    

    Call类的代码比较少,联系RPC的目的,可以分析出这个类是对RPC请求的封装,有传递的参数param,还有连接客户端的Connection,以及处理的结果response。而Connection类的成员变量多,方法也多,所以观察一下构造器,留意到变量responseQueue,应该是用来存放经过handle之后的Call。

    1. callQueue变量
    public abstract class Server {
    //省略代码
      private BlockingQueue<Call> callQueue;
    //省略代码
    }
    

    callQueue是一个全局变量,专门用来存放封装请求的Call。call从哪生产,又是被谁消费呢。使用eclipse的Call Hierarchy查看一下调用层次。


    callQueue

    依次查看可以发现在Connection的processData()方法里面出现了

        private void processData(byte[] buf) throws  IOException, InterruptedException {
         //省略代码
          Call call = new Call(id, param, this);
          callQueue.put(call);   
         //省略代码
        }
    

    Call将一些参数封装,并放入队列callQueue中。这些参数是从字节数组buf里读到的,所以继续往上找:

    call

    终于找到了我们认识的Listener,点开doRead()方法。

        void doRead(SelectionKey key) throws InterruptedException {
          int count = 0;
          Connection c = (Connection)key.attachment();
          //代码....此处省略
          try {
            count = c.readAndProcess();
          } catch (InterruptedException ieo) {
            //代码....此处省略
          }
           //代码....此处省略
        }
    

    NIO里的SelectionKey对象,doRead()方法中将Connection从SelectionKey中取出,然后通过Connection的readAndRrocess()方法封装call,也就是doRead()中生产了Call,并存放在callQueue中。

        public int readAndProcess() throws IOException, InterruptedException {
            //代码....此处省略
            count = channelRead(channel, data);
            //代码....此处省略
              if (useSasl) {
                saslReadAndProcess(data.array());
              } else {
                processOneRpc(data.array());
              }
    }
    

    readAndProcess()是从channel中读取传递过来的字节,然后从里拿到封装Call需要的那些参数,至于具体的细节就不再钻了。

    1. doAccept()
       void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
          Connection c = null;
          ServerSocketChannel server = (ServerSocketChannel) key.channel();
          SocketChannel channel;
          while ((channel = server.accept()) != null) {
            channel.configureBlocking(false);
            channel.socket().setTcpNoDelay(tcpNoDelay);
            Reader reader = getReader();
            try {
              reader.startAdd();
              SelectionKey readKey = reader.registerChannel(channel);
              c = new Connection(readKey, channel, System.currentTimeMillis());
              readKey.attach(c);
              synchronized (connectionList) {
                connectionList.add(numConnections, c);
                numConnections++;
              }
             //代码....此处省略       
            } finally {
              reader.finishAdd(); 
            }
    
          }
        }
    

    doAccept()中生产了Connection并attach到SelectionKey对象中。 这里涉及到一个新的类Reader,我们看看Reader是干什么用的。

        private class Reader implements Runnable {
          private volatile boolean adding = false;
          private Selector readSelector = null;
    
          Reader(Selector readSelector) {
            this.readSelector = readSelector;
          }
          public void run() {
            LOG.info("Starting SocketReader");
            synchronized (this) {
              while (running) {
                SelectionKey key = null;
                try {
                  readSelector.select();
                  while (adding) {
                    this.wait(1000);
                  }              
    
                  Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                  while (iter.hasNext()) {
                    key = iter.next();
                    iter.remove();
                    if (key.isValid()) {
                      if (key.isReadable()) {
                        doRead(key);
                      }
                    }
                    key = null;
                  }
                } catch (InterruptedException e) {
                  //代码....此处省略
                } catch (IOException ex) {
                  //代码....此处省略
                }
              }
            }
          }
        }
    
    

    Reader继承自Thread,那么就要搞清楚是在哪里启动的线程。调用Call Hierarchy查看,发现是在Listener初始化的时候启动的,代码如下:

        public Listener() throws IOException {
          //代码....此处省略
          for (int i = 0; i < readThreads; i++) {
            Selector readSelector = Selector.open();
            Reader reader = new Reader(readSelector);
            readers[i] = reader;
            readPool.execute(reader);
          }
          //代码....此处省略
        }
    

    Reader的线程体主要是通过doRead()在解析请求,从上面我们知道了doRead()内部是使用Connection.readAndProcess()来解析的。

    1. doRespond()
        void doRespond(Call call) throws IOException {
          synchronized (call.connection.responseQueue) {
            call.connection.responseQueue.addLast(call);
            if (call.connection.responseQueue.size() == 1) {
              processResponse(call.connection.responseQueue, true);
            }
          }
        }
    
    

    doRespond()调用了processResponse():

        private boolean processResponse(LinkedList<Call> responseQueue,
                                        boolean inHandler) throws IOException {
              //代码....此处省略
          try {
            synchronized (responseQueue) {
              //代码....此处省略
              call = responseQueue.removeFirst();
              SocketChannel channel = call.connection.channel;
              //代码....此处省略
              //
              // Send as much data as we can in the non-blocking fashion
              //
              int numBytes = channelWrite(channel, call.response);
              if (!call.response.hasRemaining()) {
                if (inHandler) {
                  //代码....此处省略
                  try {
                    // Wakeup the thread blocked on select, only then can the call 
                    // to channel.register() complete.
                    writeSelector.wakeup();
                    channel.register(writeSelector, SelectionKey.OP_WRITE, call);
                  } catch (ClosedChannelException e) {
                    //Its ok. channel might be closed else where.
                    done = true;
                  } finally {
                     //代码....此处省略
                  }
                }
          } finally {
            //代码....此处省略
            }
          }
          return done;
        }
    

    channelWrite(channel, call.response)把处理的结果返回给客户端,“Send as much data as we can in the non-blocking fashion”,如果有剩余的data就会注册写事件:
    channel.register(writeSelector, SelectionKey.OP_WRITE, call),也就会调用doAysnWrite()去处理剩下的数据。

    到这里大概的原理就清楚了,画一个不科学的示意图(但比较直观哈),如下:

    Server

    总结一下就是:

    (1)接收请求
    该阶段主要任务是接收来自各个客户端的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(callQueue)中,以便进行后续处理。该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至于每个Reader线程负责哪些客户端连接,完全由Listener决定,当前Listener只是采用了简单的轮询分配机制。

    (2)处理请求
    该阶段主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。

    Server端可同时存在多个Handler线程,它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务交给Responder线程。

    (3)返回结果
    前面提到,每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。

    Server端仅存在一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。

    相关文章

      网友评论

          本文标题:Hadoop RPC源码分析(一)

          本文链接:https://www.haomeiwen.com/subject/ltvvzxtx.html