美文网首页Hadoop
Hadoop YARN 源码阅读(一)Hadoop RPC 源码

Hadoop YARN 源码阅读(一)Hadoop RPC 源码

作者: hansiming | 来源:发表于2020-04-16 16:58 被阅读0次

RPC简介

RPC是一种通过网络从远程计算机上请求服务的机制,封装了具体实现,使用户不需要了解底层网络技术。目前存在许多开源RPC框架,比较有名的有Thrift、Protocol Buffers和Avro。Hadoop RPC与他们一样,均由两部分组成:对象序列化和远程过程调用。

通俗来讲,以接口和接口中的方法作为协议,然后客户端向服务端发送请求,请求中包括具体的接口和接口方法,而实际的操作在服务端执行,服务端执行完成之后,再通过网络将响应传回给客户端。

在整个RPC调用中具体包括了四层:

  • 序列化层:序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储,在RPC框架中。我们常见的有Java JDK默认实现的序列化,Google开源的Protocol Buffers等等,在Hadoop2.6.0中已经提供了Hadoop默认的基于Writeable和Protocol Buffers的序列化方式。
  • 函数调用层:函数调用层主要功能是定位要调用的函数并执行该函数,如上图所示,Hadoop RPC采用了Java反射机制(服务器端)与动态代理(客户端)实现了函数调用。
  • 网络传输层:网络传输层描述了Client与Server之间消息传输的方式,Hadoop RPC采用了基于TCP/IP的Socket机制。
  • 服务器端处理框架:服务器端处理框架可被抽象为网络I/O模型,它描述了客户端与服务器端间信息交互方式。

预备知识

Hadoop RPC Demo

下面先一步一步的使用Hadoop实现的RPC框架来实现RPC调用。

  • 首先添加Maven依赖,Hadoop RPC主要是在common包中实现,在本文章中使用2.6.0的Hadoop版本。
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0</version>
    </dependency>
  • 实现ClientProtocol类,该类定义了接口协议,在该接口中,我们定义了两个方法,echo和add。值得注意的是,在Hadoop中所有的协议接口都需要实现VersionedProtocol接口。
package com.cszjo.hadoop.rpc;

import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.IOException;

/**
 * RPC Protocol 类
 * Created by hansiming on 2017/9/8.
 */
public interface ClientProtocol extends VersionedProtocol {
    public static final long versionID = 1L;
    String echo(String value) throws IOException;
    int add(int a, int b) throws IOException;
}
  • 客户端类,可以看到通过RPC.getProxy方法得到了客户端的代理类,暂时先不管内部的具体实现。
package com.cszjo.hadoop.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * 客户端类
 * Created by hansiming on 2017/9/8.
 */
public class Client {
    public static final String ADDRESS = "localhost";
    public static final int PORT = 9010;
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //server ip and port
        InetSocketAddress inetSocketAddress = new InetSocketAddress(ADDRESS, PORT);
        //get client proxy
        ClientProtocol client = RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, inetSocketAddress, conf);
        System.out.println(client.add(3, 5));
        System.out.println(client.echo("rpc!"));
    }
}
  • 现在来到服务端,首先编写具体的实现类ClientProtocolImpl,实现了协议ClientProtocol接口。
package com.cszjo.hadoop.rpc;

import org.apache.hadoop.ipc.ProtocolSignature;

import java.io.IOException;

/**
 * 具体实现类
 * Created by hansiming on 2017/9/8.
 */
public class ClientProtocolImpl implements ClientProtocol {
    public String echo(String value) throws IOException {
        return value;
    }
    public int add(int a, int b) throws IOException {
        return a + b;
    }
    public long getProtocolVersion(String s, long l) throws IOException {
        return ClientProtocol.versionID;
    }
    public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
        return new ProtocolSignature(ClientProtocol.versionID, null);
    }
}
  • 服务端类,我们可以看见除了ip和port之外,还有一些其他的参数。
package com.cszjo.hadoop.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;

/**
 * Created by hansiming on 2017/9/8.
 */
public class Server {
    public static final String ADDRESS = "localhost";
    public static final int PORT = 9010;
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //set server ip, port, protocol instance, protocol, handler num
        org.apache.hadoop.ipc.Server server = new RPC.Builder(conf).setBindAddress(ADDRESS)
                .setPort(PORT).setInstance(new ClientProtocolImpl()).setProtocol(ClientProtocol.class)
                .setNumHandlers(5).build();
        server.start();
    }
}
  • 下面我们先启动Server类,在启动Client类去调用Server端,就可以返回我们想要的结果,一个简单的Hadoop RPC就实现了。
8
rpc!

Process finished with exit code 0

Hadoop RPC 源码分析

Client端

通过这样的一个方法拿到了Client的代理对象,getProxy方法存在于RPC类中。

ClientProtocol client = RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, inetSocketAddress, conf);

打开RPC类查看源码。

public static <T> T getProxy(Class<T> protocol,
                                 long clientVersion,
                                 InetSocketAddress addr, Configuration conf)
     throws IOException {
     return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
   }

追着getProtocolProxy方法跟下来。

 public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                              long clientVersion,
                              InetSocketAddress addr,
                              UserGroupInformation ticket,
                              Configuration conf,
                              SocketFactory factory,
                              int rpcTimeout,
                              RetryPolicy connectionRetryPolicy,
                              AtomicBoolean fallbackToSimpleAuth)
     throws IOException {
  if (UserGroupInformation.isSecurityEnabled()) {
    SaslRpcServer.init(conf);
  }
   //首先拿到protocol engine,然后再拿到ProtocolProxy
  return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
      addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
      fallbackToSimpleAuth);
}

跟进getProtocolEngine方法,该方法主要是得到protocol的引擎。

// 返回一个rpc的引擎
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
    Configuration conf) {
  //PROTOCOL_ENGINES private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES
  //PROTOCOL_ENGINES 缓存了protocol类对应的RpcEngine的映射
  RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
  if (engine == null) {
    //从conf配置中去得到rpc.engine.{protocol_name}
    //如果没有配置,则默认使用WritableRpcEngine
    Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                  WritableRpcEngine.class);
    //使用反射得到RpcEngine
    engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
    //加入缓存
    PROTOCOL_ENGINES.put(protocol, engine);
  }
  return engine;
}

然后再是ProtocolEngine的getProxy方法,在Hadoop中默认提供了两种ProtocolEngine。

  • 一种是WritableRpcEngine,使用的是Hadoop默认的序列化方式,所有要序列化的对象需要实现Writeable接口。
  • 一种是ProtobufRpcEngine,使用的是Google的Protocol buffers作为序列化方式。我们这里使用默认的WritableRpcEngine来讲解,进入上面的getProxy方法。
/**
 * 构造一个client端的实现了protocol接口的代理对象,该代理对象与指定的server端进行交互
 */
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                       InetSocketAddress addr, UserGroupInformation ticket,
                       Configuration conf, SocketFactory factory,
                       int rpcTimeout, RetryPolicy connectionRetryPolicy,
                       AtomicBoolean fallbackToSimpleAuth)
  throws IOException {    
  if (connectionRetryPolicy != null) {
    throw new UnsupportedOperationException(
        "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
  }
  //通过动态代理得到protocol接口的代理对象,关键在与Invoker类,实现了代理对象的具体实现。
  T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
      new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
          factory, rpcTimeout, fallbackToSimpleAuth));
  //ProtocolProxy是一个包装类,里面有代理对象和protocol接口类
  return new ProtocolProxy<T>(protocol, proxy, true);
}

我们打开关键的Invoker类,该类是WritableRpcEngine类的一个内部类。

//实现RpcInvocationHandler,该接口实现了InvocationHandler
private static class Invoker implements RpcInvocationHandler {
  // Client类下面的ConnectionId,包含了与server端连接的信息,包括ip,port,ticket,具体的protocol接口等等。
  private Client.ConnectionId remoteId;
  //Client类
  private Client client;
  
...
  public Invoker(Class<?> protocol,
                 InetSocketAddress address, UserGroupInformation ticket,
                 Configuration conf, SocketFactory factory,
                 int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
        ticket, rpcTimeout, conf);
    //ClientCache CLIENTS=new ClientCache();
    //ClientCache是一个保存了SocketFactory映射Client的缓存。
    //如果SocketFactory的key不存在,则重新实例化一个Client类。
    this.client = CLIENTS.getClient(conf, factory);
    this.fallbackToSimpleAuth = fallbackToSimpleAuth;
  }

在一个实现了InvocationHandler接口的类中,最关键的就是invoke方法。

@Override
    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
  ...
  ObjectWritable value;
      try {
        //invoke方法最关键的地方就是调用了client的call方法。
        //也就是代理对象的每个方法,都会调用client的call方法。
        //其中的参数:RpcKind --> 指定rpc的方式,这里指定了RPC_WRITABLE,如果是ProtoBuf,则指定为RPC_PROTOCOL_BUFFER
        // rpcRequest --> 指定了Request对象,具体就是需要序列化传到Server端的具体内容,我们可以看到,这里把方法和参数包装在了Invocation的类里。
        // remoteId --> 类型为ConnectionId
        value = (ObjectWritable)
          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
            remoteId, fallbackToSimpleAuth);
      } 
  ...

查看Invocation类。

//一个方法的实现,包含了方法名和它的参数,实现了Writable接口,所以可以被Hadoop序列化
private static class Invocation implements Writable, Configurable {
  //方法名
  private String methodName;
  //参数类型
  private Class<?>[] parameterClasses;
  //参数
  private Object[] parameters;
...
public Invocation(Method method, Object[] parameters) {
    this.methodName = method.getName();
    this.parameterClasses = method.getParameterTypes();
    this.parameters = parameters;
  ...
}
//在该类中还实现了Writable接口中的write和readFields方法,主要是写和读的具体实现,感兴趣可以查看。

我们可以看到,最终代理对象执行的是Client类的call方法,所以接下来我们会分析Client这个类,我们先大概看下Client中存在的一些内部类。

[图片上传失败...(image-545a2b-1587027473964)]

  • Connection:每个Connection都包含了一个和远程地址连接了的socket,Connection是一个线程类,循环的去读server端返回的响应,然后在通知调用者。
  • Call:代表了一个RPC调用。
  • ConnectionId:一个ConnectionId唯一的标注一个Connection,主要通过以下属性

接上面的,我们来看看Client的call方法究竟做了什么。

//执行一个rpc调用,传送rpcRequest到服务端,并返回rpc response
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
    ConnectionId remoteId, int serviceClass,
    AtomicBoolean fallbackToSimpleAuth) throws IOException {
  //创建一个Call对象,一个Call对象就表示了一次Rpc调用,主要是包含了rpcKind,rpcRequest,rpcResponse
  final Call call = createCall(rpcKind, rpcRequest);
  //得到Connection对象,在getConnection方法里面的setupIOstreams方法最后启动Connection线程
  Connection connection = getConnection(remoteId, call, serviceClass,
    fallbackToSimpleAuth);
  try {
    // 通过connection对象去发送一次rpc调用
    connection.sendRpcRequest(call);                 // send the rpc request
  } catch (RejectedExecutionException e) {
    throw new IOException("connection has been closed", e);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    LOG.warn("interrupted waiting to send rpc request to server", e);
    throw new IOException(e);
  }
  boolean interrupted = false;
  synchronized (call) {
    while (!call.done) {
      try {
        //等待唤醒
        //Connection线程类会不断的去轮询,查看是否有response到达,如果达到,则会唤醒call
        call.wait();                           // wait for the result
      } catch (InterruptedException ie) {
        // save the fact that we were interrupted
        interrupted = true;
      }
    }
    if (interrupted) {
      // set the interrupt flag now that we are done waiting
      Thread.currentThread().interrupt();
    }
    if (call.error != null) {
      //调用失败,处理error
      if (call.error instanceof RemoteException) {
        call.error.fillInStackTrace();
        throw call.error;
      } else { // local exception
        InetSocketAddress address = connection.getRemoteAddress();
        throw NetUtils.wrapException(address.getHostName(),
                address.getPort(),
                NetUtils.getHostname(),
                0,
                call.error);
      }
    } else {
      //返回response
      return call.getRpcResponse();
    }
  }
}

来看看Connection类,具体的实现,Connection是一个线程类,我们先看看run方法具体是在执行什么。

@Override
public void run() {
  if (LOG.isDebugEnabled())
    LOG.debug(getName() + ": starting, having connections " 
        + connections.size());
  try {
    //等待read操作或者关闭connection,如果读到了response,则返回true
    while (waitForWork()) {//wait here for work - read or close connection
      //接收rpc response
      receiveRpcResponse();
    }
  } catch (Throwable t) {
    // This truly is unexpected, since we catch IOException in receiveResponse
    // -- this is only to be really sure that we don't leave a client hanging
    // forever.
    LOG.warn("Unexpected error reading responses on connection " + this, t);
    markClosed(new IOException("Error reading responses", t));
  }
  
//关闭连接
  close();
  
  if (LOG.isDebugEnabled())
    LOG.debug(getName() + ": stopped, remaining connections "
        + connections.size());
}

waitForWork方法

//1.等待直到有人告诉我们去开始read rpc response 返回true
//2.空闲了太长的时间,被标记为关闭,最大空闲时间为10s 返回false
//3.client被标记为不再运行 返回false。
private synchronized boolean waitForWork() {
  //calls表示保存的是当前存活的call
  //如果calls是空的,则等待maxIdleTime的时间
  if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
    long timeout = maxIdleTime-
          (Time.now()-lastActivity.get());
    if (timeout>0) {
      try {
        wait(timeout);
      } catch (InterruptedException e) {}
    }
  }
  
  if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
    return true;
  } else if (shouldCloseConnection.get()) {
    //连接被关闭
    return false;
  } else if (calls.isEmpty()) { // idle connection closed or stopped
    //超过了最长的等待时间
    markClosed(null);
    return false;
  } else { // get stopped but there are still pending requests 
    markClosed((IOException)new IOException().initCause(
        new InterruptedException()));
    return false;
  }
}

receiveRpcResponse方法就不列出来了,大概就是从流中返序列化出来server端的结果。从上面我们可以看出Connection是一个线程类,会等待结果的返回。Connection的启动方法在getConnection方法里面的setupIOstreams方法最后。下面我们来看Connection类里的另一个关键的方法,sendRpcRequest方法。

//初始化一个rpc调用,发送一个rpc request到远程的服务端。
public void sendRpcRequest(final Call call)
    throws InterruptedException, IOException {
  if (shouldCloseConnection.get()) {
    return;
  }
    //整个rpc request由三部分组成
  // 1. header + rpc request的长度
  // 2. header
  // 3. rpc request
  /**
   * 而一个rpc的header由如下组成
   * +----------------------------------+
   * |  "hrpc" 4 bytes                  |      
   * +----------------------------------+
   * |  Version (1 byte)                |
   * +----------------------------------+
   * |  Service Class (1 byte)          |
   * +----------------------------------+
   * |  AuthProtocol (1 byte)           |      
   * +----------------------------------+
   */
  final DataOutputBuffer d = new DataOutputBuffer();
  //request header主要包括了rpc的类型
  RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
      call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
      clientId);
  //将header写入到outputBuffer缓冲区中
  header.writeDelimitedTo(d);
  //将rpcRequest也写入到outputBuffer缓冲区中,跟在header之后
  call.rpcRequest.write(d);
  synchronized (sendRpcRequestLock) {
    //sendParamsExecutor是Client类的一个线程池对象,所以真实的sendRequest实现并不在Connection类中。
    Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
      @Override
      public void run() {
        try {
          synchronized (Connection.this.out) {
            if (shouldCloseConnection.get()) {
              return;
            }
            
            if (LOG.isDebugEnabled())
              LOG.debug(getName() + " sending #" + call.id);
     
            byte[] data = d.getData();
            int totalLength = d.getLength();
            //out是保存在Connection中的DataOutputStream对象
            //先写入这次rpc调用的request的总的大小
            out.writeInt(totalLength); // Total Length
            //写入rpcRequestHeader和rpcRequest
            out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
            out.flush();
          }
        } catch (IOException e) {
          // exception at this point would leave the connection in an
          // unrecoverable state (eg half a call left on the wire).
          // So, close the connection, killing any outstanding calls
          markClosed(e);
        } finally {
          //the buffer is just an in-memory buffer, but it is still polite to
          // close early
          IOUtils.closeStream(d);
        }
      }
    });
  
    try {
      senderFuture.get();
    } catch (ExecutionException e) {
      Throwable cause = e.getCause();
      
      // cause should only be a RuntimeException as the Runnable above
      // catches IOException
      if (cause instanceof RuntimeException) {
        throw (RuntimeException) cause;
      } else {
        throw new RuntimeException("unexpected checked exception", cause);
      }
    }
  }
}

好了,整个client端的大概思路就是这样,重点是通过动态代理得到新的对象,然后选定rpc引擎,将调用的方法名,方法参数传给服务端。

Server端

因为Server端涉及到多个客户端的调用,所以使用了如下的设计,统称为Reactor设计模式。Reactor主要是基于多路复用的非阻塞IO实现的基于事件驱动的IO框架。Hadoop RPC底层使用的是Java NIO,而Java NIO正好就是一种多路复用的非阻塞IO,Java NIO的重点就是在Selector,有兴趣的同学可以找些资料自己看下。

image

如上图所示,显示的就是在Server端处理调用的过程。大概有如下组件,我们大致分析一下:

  • Client:客户端。
  • Listener:Server端只存在一个Listener,主要功能就是分发,在Selector中注册了ACCEPT事件,没当有新的Client连接,便会为Client指定一个Reader线程。
  • Reader:Reader线程有多个,主要任务是读取请求,并将请求封装成一个Call,放入callQueue中。
  • Call:
  • Handler:
  • Responder:

接下来,我们深入到各个组件去看看。

Listener

在Listener中包含了如下几个主要的成员变量:

  • ServerSocketChannel acceptChannel:一个accept的channel
  • Selector selector:我们在Server端使用的Selector
  • Reader[] readers:Reader线程的数组

下面是Listener的构造方法

public Listener() throws IOException {
  address = new InetSocketAddress(bindAddress, port);
  // 创建一个server channel,并且设置为非阻塞模式,channel中包含了对应端口绑定的socket
  // ServerSocketChannel acceptChannel
  acceptChannel = ServerSocketChannel.open();
  acceptChannel.configureBlocking(false);
  // 将本地的ip和端口,绑定到这个server channel
  bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
  port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
  // 创建一个selector
  selector= Selector.open();
  // 创建并启动readThreads个Reader线程 
  // readThreads的大小可以在构建Server时指定
  // 也可以在配置文件中通过ipc.server.read.threadpool.size参数指定
  // 否则默认大小为1
  readers = new Reader[readThreads];
  for (int i = 0; i < readThreads; i++) {
    Reader reader = new Reader(
        "Socket Reader #" + (i + 1) + " for port " + port);
    readers[i] = reader;
    reader.start();
  }
  // 将acceptChannel的ACCEPT事件注册进selector
  acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
  this.setName("IPC Server listener on " + port);
  this.setDaemon(true);
}

下面再看下Listener的run方法具体的实现:

@Override
public void run() {
//轮询从selector中去获得已经就绪的事件
  while (running) {
    SelectionKey key = null;
    try {
      //得到在构造方法中实现的selector,并执行select()方法,返回已经就绪的IO操作的channal的个数
      getSelector().select();
      //得到已经就绪的IO操作的key的集合
      Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
      while (iter.hasNext()) {
        key = iter.next();
        iter.remove();
        try {
          if (key.isValid()) {
            if (key.isAcceptable())
              //ACCEPT事件具体的操作
              doAccept(key);
          }
        } catch (IOException e) {
        }
        key = null;
      }
    }
...

再看下doAccept方法

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
  //从key中拿出ServerSocketChannel
  ServerSocketChannel server = (ServerSocketChannel) key.channel();
  SocketChannel channel;
  //server.accept()返回和ServerChannel连接的SocketChannel
  while ((channel = server.accept()) != null) {
    channel.configureBlocking(false);
    channel.socket().setTcpNoDelay(tcpNoDelay);
    channel.socket().setKeepAlive(true);
    //从Reader数组中拿到一个Reader,下面是getReader()方法的具体实现
    //currentReader = (currentReader + 1) % readers.length;
    //return readers[currentReader];
    //currentReader 表示的是当前使用的Reader的下标,当又有一个Client连入时,
    //就加一取Readers个数的模,得到Reader的下标
    Reader reader = getReader();
    //ConnectionManager保存了Connection的集合,
    //register方法将channel包装成了Connection对象,并加入了ConncetionManager
    Connection c = connectionManager.register(channel);
    //将Connection附加在SelectionKey上面
    key.attach(c);  // so closeCurrentConnection can get the object
    //选中的reader添加Connection
    reader.addConnection(c);
  }
}

从上面的代码不难看出,Listener类是一个线程类,主要任务就是为连入的Socket分配Reader。接下来我们再看下Reader类。

Reader

Reader也是一个线程类,先来看看构造方法。

Reader(String name) throws IOException {
  super(name);
  // pendingConnections 是个阻塞队列,保存的是这个Reader分到的Connection
  // readerPendingConnectionQueue表示阻塞队列的大小
  // 可以通过ipc.server.read.connection-queue.size参数指定
  // 默认大小是100
  this.pendingConnections =
      new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
  // 每个Reader都有一个自己的Selector
  this.readSelector = Selector.open();
}

再看下run方法。

@Override
public void run() {
  try {
    // 轮询
    doRunLoop();
  } finally {
    try {
      readSelector.close();
    } catch (IOException ioe) {
      LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
    }
  }
}

在run方法中主要是去调用doRunLoop方法。

private synchronized void doRunLoop() {
  while (running) {
    SelectionKey key = null;
    try {
      //得到pendingConnections的size
      int size = pendingConnections.size();
      for (int i=size; i>0; i--) {
        //从pendingConnections队列中take得到Connection
        Connection conn = pendingConnections.take();
        //在Selector上为Connection的SocketChannal注册READ事件
        conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
      }
      readSelector.select();
      Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
      while (iter.hasNext()) {
        key = iter.next();
        iter.remove();
        if (key.isValid()) {
          if (key.isReadable()) {
            //读操作
            doRead(key);
          }
        }
        key = null;
      }
...

查看read操作,doRead方法。

void doRead(SelectionKey key) throws InterruptedException {
  int count = 0;
  // 从SelectionKey中将之前附上的Connection拿出来
  Connection c = (Connection)key.attachment();
  if (c == null) {
    return;  
  }
  c.setLastContact(Time.now());
  
  try {
    // 读方法在这里
    count = c.readAndProcess();
  }

继续打开Connection的readAndProcess方法

// 先是读取header,在读取真实的数据
public int readAndProcess()
    throws WrappedRpcServerException, IOException, InterruptedException {
  while (true) {
    //每次之多读取一个rpc request,如果一个header没有读完成,则会一直读,
    // 直到读到第一个RPC,或者数据读完
    int count = -1;
    //在开始分析该方法之前,再把header的组成列出来
    /**
    * 而一个rpc的header由如下组成
    * +----------------------------------+
    * |  "hrpc" 4 bytes                  |
    * +----------------------------------+
    * |  Version (1 byte)                |
    * +----------------------------------+
    * |  Service Class (1 byte)          |
    * +----------------------------------+
    * |  AuthProtocol (1 byte)           |
    * +----------------------------------+
    */
    //dataLengthBuffer的大小为4个byte,用来接收"hrpc"
    //如果dataLengthBuffer还有剩余,则从channel里面读取4个byte到dataLengthBuffer
    if (dataLengthBuffer.remaining() > 0) {
      count = channelRead(channel, dataLengthBuffer);       
      if (count < 0 || dataLengthBuffer.remaining() > 0) 
        return count;
    }
    // 读取request header
    if (!connectionHeaderRead) {
      //Every connection is expected to send the header.
      if (connectionHeaderBuf == null) {
        // connectionHeaderBuf 申请3byte
        connectionHeaderBuf = ByteBuffer.allocate(3);
      }
      // 写入3个byte到connectionHeaderBuf中
      count = channelRead(channel, connectionHeaderBuf);
      if (count < 0 || connectionHeaderBuf.remaining() > 0) {
        return count;
      }
      // vesrion
      int version = connectionHeaderBuf.get(0);
      // service class 默认大小是0
      this.setServiceClass(connectionHeaderBuf.get(1));
      // 将dataLengthBuffer从写模式转为读模式
      dataLengthBuffer.flip();
      
      // HTTP_GET_BYTES : "GET "
      // 如果前四个byte是HTTP_GET_BYTES,就证明是一个HTTP请求
      // 就返回一个响应,告诉这是一个RPC调用,不是HTTP调用
      if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {
        setupHttpRequestOnIpcPortResponse();
        return -1;
      }
      // RpcConstants.HEADER : "hrpc"
      if (!RpcConstants.HEADER.equals(dataLengthBuffer)
          || version != CURRENT_VERSION) {
        setupBadVersionResponse(version);
        return -1;
      }
      
      // 得到auth
      authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));          
      // 清空dataLengthBuffer
      dataLengthBuffer.clear();
      connectionHeaderBuf = null;
      connectionHeaderRead = true;
      // 这里使用continue就让dataLengthBuffer再读4个byte,读出rpcRequest的长度
      continue;
    }
    //rpc 的正文如下,先是4个byte的数据,表示下面内容长度
    /**
     * +-----------------+
     |  4 bytes length |
     +-----------------+
     | IpcConnection   |
     | ContextProto    |
     +-----------------+
     */
    if (data == null) {
      dataLengthBuffer.flip();
      dataLength = dataLengthBuffer.getInt();
      // 有一个maxDataLength,表示最大的Data长度,大小为64MB
      checkDataLength(dataLength);
      data = ByteBuffer.allocate(dataLength);
    }
    // 读取data
    count = channelRead(channel, data);
    // data应该没有预留,正好读满
    if (data.remaining() == 0) {
      dataLengthBuffer.clear();
      data.flip();
      boolean isHeaderRead = connectionContextRead;
      // rpc的处理还在这里面
      processOneRpc(data.array());
      data = null;
      if (!isHeaderRead) {
        continue;
      }
    } 
    return count;
  }
}

继续打开processOneRpc方法,感觉快要到了真正封装Call的地方

private void processOneRpc(byte[] buf)
    throws IOException, WrappedRpcServerException, InterruptedException {
  int callId = -1;
  int retry = RpcConstants.INVALID_RETRY_COUNT;
  try {
    final DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(buf));
    // 从InputStream中反序列化出header,header用的是RpcRequestHeaderProto
    final RpcRequestHeaderProto header =
        decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
    callId = header.getCallId();
    retry = header.getRetryCount();
    if (LOG.isDebugEnabled()) {
      LOG.debug(" got #" + callId);
    }
    // 检查rpcRequestHeader
    checkRpcHeaders(header);
    
    if (callId < 0) { // callIds typically used during connection setup
      processRpcOutOfBandRequest(header, dis);
    } else if (!connectionContextRead) {
      throw new WrappedRpcServerException(
          RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
          "Connection context not established");
    } else {
      // 处理一次rpc的请求
      processRpcRequest(header, dis);
    }
  }
}

接下来processRpcRequest,在这个方法中,去实例化了一个Call对象,并将该对象放入了callQueue之中。

private void processRpcRequest(RpcRequestHeaderProto header,
    DataInputStream dis) throws WrappedRpcServerException,
    InterruptedException {
  // 根据header中的rpcKind得到rpcRequestClass
  Class<? extends Writable> rpcRequestClass = 
      getRpcRequestWrapper(header.getRpcKind());
  Writable rpcRequest;
  try { //Read the rpc request
    // 实例化rpcRequest,如果我们使用的是WritableRpcEngine,则这里反射得到的就是Invocation类
    rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
    // 调用readFields,去读取流中的数据,具体实现在Invocation类中
    rpcRequest.readFields(dis);
  }
...      
  // 基于从Request里面提取出来的内容,构造一个Call对象
  Call call = new Call(header.getCallId(), header.getRetryCount(),
      rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
      header.getClientId().toByteArray(), traceSpan);
  //将call对象放入到callQueue对象中
  callQueue.put(call);              // queue the call; maybe blocked here
  // rpc调用的次数增加一次
  incRpcCount();  // Increment the rpc count
}

Call

下面来简单看下Call类,Call类中封装了request和response,放在callQueue之中,等待Handler的处理。

public static class Call implements Schedulable {
  private final int callId;             // the client's call id
  // 重试次数
  private final int retryCount;        // the retry count of the call
  // RPC Request 等待Handler处理
  // 如果是用的WritableRpcEngine,这里则是一个Invocation类
  private final Writable rpcRequest;    // Serialized Rpc request from client
  // 保存着和客户端的连接信息
  private final Connection connection;  // connection to client
  private long timestamp;               // time received when response is null
  // Response                            // time served when response is not null
  private ByteBuffer rpcResponse;       // the response for this call
  // rpcKind
  private final RPC.RpcKind rpcKind;
  private final byte[] clientId;
...

Handler

Handler的主要任务就是从callQueue拿出Call,并通过Request找到真实的实现方法,并通过方法名和参数进行执行。查看run()方法。

@Override
public void run() {
...
    // volatile private boolean running 标记是否还在执行
    while (running) {
      try {
        // 从callQueue拿出Call
        final Call call = callQueue.take();
          ...
    if (call.connection.user == null) {
      // 调用了call方法
            value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                       call.timestamp);
          } 
    ...

call方法是一个抽象方法,而Server类其实就是一个抽象类。

public abstract Writable call(RPC.RpcKind rpcKind, String protocol,
    Writable param, long receiveTime) throws Exception;

Server类只有一个子类,在RPC类中实现,先查看实现的call方法。

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
    Writable rpcRequest, long receiveTime) throws Exception {
  // 先是通过rpcKind拿到RpcInvoker,然后再去调用RpcInvoker的call方法
  // 我们跳过拿rpcInvoker的过程,主要就是每个RpcEngine都会在Server里注册自己的rpcInvoker
  // 接下来我们就用WritableRpcEngine的WritableRpcInvoker来讲解
  return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
      receiveTime);
}

我们跳过中间拿到rpcInvoker的过程,直接来到WritableRpcInvoker的call方法。

@Override
 public Writable call(org.apache.hadoop.ipc.RPC.Server server,
     String protocolName, Writable rpcRequest, long receivedTime)
     throws IOException, RPC.VersionMismatch {
 ....
     // 通过在Call里面保存的methodName和parameter得到具体的方法
     Method method =
         protocolImpl.protocolClass.getMethod(call.getMethodName(),
         call.getParameterClasses());
     method.setAccessible(true);
     server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
    // 通过反射去得到执行RPC调用,找了很久,终于在这里找到了服务端实际的调用
    // 实际的调用是通过Handler线程从callQueue里拿出的Call进行调用
     Object value = 
         method.invoke(protocolImpl.protocolImpl, call.getParameters());
    // 将最后的结果返回
     return new ObjectWritable(method.getReturnType(), value);
    ...

这里就是在服务端的最终的调用。

相关文章

网友评论

    本文标题:Hadoop YARN 源码阅读(一)Hadoop RPC 源码

    本文链接:https://www.haomeiwen.com/subject/xdldvhtx.html