美文网首页数客联盟
hbase与客户端的通信过程解析

hbase与客户端的通信过程解析

作者: tinyMonkey | 来源:发表于2018-06-06 09:52 被阅读22次

    hbase通信主要涵盖了两个技术,一个是google的protobuf rpc通信框架,一个是java的NIO通信

    启动入口

    org.apache.hadoop.hbase.regionserver.HRegionServer这个类是regionserver的启动类;
    org.apache.hadoop.hbase.master.HMaster这个类是Hmaster的启动类,继承了HRegionServer;
    而HRegionServer定义了一个org.apache.hadoop.hbase.regionserver.RSRpcServices变量:

    private RSRpcServices rsRpcServices;
    
    • rpcservices主要实现了对于所有客户端请求的核心处理过程;
    • rpcservices中使用了一个关键的类RpcServer用来和客户端通信:
          rpcServer = new RpcServer(rs, name, getServices(),
              bindAddress, // use final bindAddress for this server.
              rs.conf,
              rpcSchedulerFactory.create(rs.conf, this, rs));
    

    因此整个通信过程最核心的就是这两个类:RSRpcServices和RpcServer

    利用google的protobuf 实现rpc通信

    hbase的protobuf的使用流程如下:

    1.在Client.proto中定义service:ClientService,通过protobuf提供的命令生成对应服务接口和message类
    service ClientService {
      rpc Get(GetRequest)
        returns(GetResponse);
    
      rpc Scan(ScanRequest)
        returns(ScanResponse);
    
      rpc BulkLoadHFile(BulkLoadHFileRequest)
        returns(BulkLoadHFileResponse);
    
      rpc ExecService(CoprocessorServiceRequest)
        returns(CoprocessorServiceResponse);
        
      rpc ExecRegionServerService(CoprocessorServiceRequest)
        returns(CoprocessorServiceResponse);
    
      rpc Multi(MultiRequest)
        returns(MultiResponse);
    }
    
    2.在服务端定义一个类RSRpcServices实现自动生成的接口:
    • org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
    • org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
      它们继承自:
    • org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
      因此,RSRpcServices便是所有请求的具体实现类。因为它实现了所有的请求AdminService和ClientService。
    3.在服务端调用 ClientService.newReflectiveBlockingService(final BlockingInterface impl)方法生成对应的com.google.protobuf.BlockingService实现类。
     public static com.google.protobuf.BlockingService
            newReflectiveBlockingService(final BlockingInterface impl) {
          return new com.google.protobuf.BlockingService() {
            public final com.google.protobuf.Descriptors.ServiceDescriptor
                getDescriptorForType() {
              return getDescriptor();
            }
    
            public final com.google.protobuf.Message callBlockingMethod(
                com.google.protobuf.Descriptors.MethodDescriptor method,
                com.google.protobuf.RpcController controller,
                com.google.protobuf.Message request)
                throws com.google.protobuf.ServiceException {
              if (method.getService() != getDescriptor()) {
                throw new java.lang.IllegalArgumentException(
                  "Service.callBlockingMethod() given method descriptor for " +
                  "wrong service type.");
              }
              switch(method.getIndex()) {
                case 0:
                  return impl.get(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest)request);
                case 1:
                  return impl.mutate(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest)request);
                case 2:
                  return impl.scan(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest)request);
                case 3:
                  return impl.bulkLoadHFile(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest)request);
                case 4:
                  return impl.execService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request);
                case 5:
                  return impl.execRegionServerService(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest)request);
                case 6:
                  return impl.multi(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request);
                default:
                  throw new java.lang.AssertionError("Can't get here.");
              }
            }
    
    4.解析客户端发送的请求,映射成MethodDescriptor和Message对象;

    此过程主要是以下要讨论的 JAVA NIO做的工作;

    5.使用BlockingService执行callBlockingMethod方法进行对客户端请求进行处理

    Message callBlockingMethod(MethodDescriptor var1, RpcController var2, Message var3) throws ServiceException;

    java NIO

    关于 java NIO的使用,主要集中于RpcServer类中:
    主要使用了一个listener,但是实际情况这不是一个常见的listener模式,而是用来监听请求的监听器。

    // Start the listener here and let it bind to the port
        listener = new Listener(name);
    

    而它的实现如下:

    public Listener(final String name) throws IOException {
          super(name);
          backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
          // Create a new server socket and set to non blocking mode
          acceptChannel = ServerSocketChannel.open();
          acceptChannel.configureBlocking(false);
    
          // Bind the server socket to the binding addrees (can be different from the default interface)
          bind(acceptChannel.socket(), bindAddress, backlogLength);
          port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
          address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
    
          // create a selector;
          selector= Selector.open();
    
          readers = new Reader[readThreads];
          readPool = Executors.newFixedThreadPool(readThreads,
            new ThreadFactoryBuilder().setNameFormat(
              "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
              ",port=" + port).setDaemon(true).build());
          for (int i = 0; i < readThreads; ++i) {
            Reader reader = new Reader();
            readers[i] = reader;
            readPool.execute(reader);
          }
          LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
    
          // Register accepts on the server socket with the selector.
          acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
          this.setName("RpcServer.listener,port=" + port);
          this.setDaemon(true);
        }
    

    主要定义了一个acceptChannel,一个selector和多个readers,每个reader对应一个selector;

    1.accept connection

    它的主线程是监控selector中的accept请求,进行doAccept操作:

     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
          Connection c;
          ServerSocketChannel server = (ServerSocketChannel) key.channel();
    
          SocketChannel channel;
          while ((channel = server.accept()) != null) {
            try {
              channel.configureBlocking(false);
              channel.socket().setTcpNoDelay(tcpNoDelay);
              channel.socket().setKeepAlive(tcpKeepAlive);
            } catch (IOException ioe) {
              channel.close();
              throw ioe;
            }
    
            Reader reader = getReader();
            try {
              reader.startAdd();
              SelectionKey readKey = reader.registerChannel(channel);
              c = getConnection(channel, System.currentTimeMillis());
              readKey.attach(c);
              synchronized (connectionList) {
                connectionList.add(numConnections, c);
                numConnections++;
              }
              if (LOG.isDebugEnabled())
                LOG.debug(getName() + ": connection from " + c.toString() +
                    "; # active connections: " + numConnections);
            } finally {
              reader.finishAdd();
            }
          }
        }
    

    主要是对每个accept请求创建了一个connection对象,每个connection对应一个读写数据的channel,然后注册channel给某一个reader的selector;

    2.read and process操作

    对于每个reader线程来说,会对自己selector绑定的所有的SelectionKey进行查看,如果接收到数据,那么对绑定的connection进行处理,最后调用connection的process方法;
    解析收到的请求,然后创建请求;通过scheduler执行,

    Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
                  totalRequestSize, traceInfo, this.addr);
    
          if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
            callQueueSize.add(-1 * call.getSize());
    

    scheduler是整个regionserver处理所有请求的核心,创建scheduler需要用到参数如下,因此hbase.regionserver.handler.count参数决定了同时进行处理请求的handler个数,即regionserver的并发能力。

    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
            HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
     public static final String REGION_SERVER_HANDLER_COUNT = "hbase.regionserver.handler.count";
    

    最后再rpcserver中调用call函数:
    Message result = service.callBlockingMethod(md, controller, param);

    3.返回数据序列化

    上边写到数据的具体执行在CallRunner中,执行结束后调用Call.setResponse方法,

     protected synchronized void setResponse(Object m, final CellScanner cells,
            Throwable t, String errorMsg) {
           ...
            Message header = headerBuilder.build();
    
            // Organize the response as a set of bytebuffers rather than collect it all together inside
            // one big byte array; save on allocations.
            ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
            ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
            int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
              (this.cellBlock == null? 0: this.cellBlock.limit());
            ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
            bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
           ...
          } catch (IOException e) {
            LOG.warn("Exception while creating response " + e);
          }
          this.response = bc;
        }
    

    其中通过IPCUtil.getDelimitedMessageAsByteBuffer(result)把messgae数据序列化成buffer,调用google提供的com.google.protobuf.CodedOutputStream实现的序列化方法。

    CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
        // This will write out the vint preamble and the message serialized.
        cos.writeMessageNoTag(m);
    
    5.response数据写回channel

    以下是reponder提供的方法:

     void doRespond(Call call) throws IOException {
          boolean added = false;
    
          // If there is already a write in progress, we don't wait. This allows to free the handlers
          //  immediately for other tasks.
          if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
            try {
              if (call.connection.responseQueue.isEmpty()) {
                // If we're alone, we can try to do a direct call to the socket. It's
                //  an optimisation to save on context switches and data transfer between cores..
                if (processResponse(call)) {
                  return; // we're done.
                }
                // Too big to fit, putting ahead.
                call.connection.responseQueue.addFirst(call);
                added = true; // We will register to the selector later, outside of the lock.
              }
            } finally {
              call.connection.responseWriteLock.unlock();
            }
          }
    
          if (!added) {
            call.connection.responseQueue.addLast(call);
          }
          call.responder.registerForWrite(call.connection);
    
          // set the serve time when the response has to be sent later
          call.timestamp = System.currentTimeMillis();
        }
    

    最后将数据写进属于自己的channel中。

    相关文章

      网友评论

        本文标题:hbase与客户端的通信过程解析

        本文链接:https://www.haomeiwen.com/subject/jbmksftx.html