美文网首页
YarnRPC过程

YarnRPC过程

作者: searchworld | 来源:发表于2017-11-26 00:10 被阅读101次

    最近在看《深入解析YARN架构设计与实现原理》,看到第三章YRAN RPC实现的时候对其中如果使用ProtoBuf不是很理解,尤其是里面同一个类会有Java定义和ProtoBuf两个实现,两者的交互细节书上写的不是很清楚,这里根据自己的理解简单记录下。这里以server端为例。

    Server的创建在是通过RpcServerFactoryPBImpl.getServer方法,这里protocol参数以ResourceTracker为例(在ResourceTrackerService.serviceStart中),instance参数就是ResourceTrackerService本身,也就是真正实现了rpc调用服务端方法的类。最终实现方法是:

    public Server getServer(Class<?> protocol, Object instance,
          InetSocketAddress addr, Configuration conf,
          SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
          String portRangeConfig) {
        
        Constructor<?> constructor = serviceCache.get(protocol);
        if (constructor == null) {
          Class<?> pbServiceImplClazz = null;
          try {
            // getPbServiceImplClassName 根据约定的规则在 PB_IMPL_PACKAGE_SUFFIX 找到  ResourceTrackerPBServiceImpl 类
            pbServiceImplClazz = localConf
                .getClassByName(getPbServiceImplClassName(protocol));
          } catch (ClassNotFoundException e) {
            throw new YarnRuntimeException("Failed to load class: ["
                + getPbServiceImplClassName(protocol) + "]", e);
          }
          try {
            constructor = pbServiceImplClazz.getConstructor(protocol);
            constructor.setAccessible(true);
            serviceCache.putIfAbsent(protocol, constructor);
          } catch (NoSuchMethodException e) {
            throw new YarnRuntimeException("Could not find constructor with params: "
                + Long.TYPE + ", " + InetSocketAddress.class + ", "
                + Configuration.class, e);
          }
        }
        
        Object service = null;
        try {
          // 使用ResourceTrackerService实例去构造ResourceTrackerPBServiceImpl,
          // ResourceTrackerPBServiceImpl的真正功能只是将PB版的请求(比如NodeHeartbeatRequestProto)封装成Java版的请求(对应的是NodeHeartbeatRequestPBImpl),
          // 再传给ResourceTrackerService实际调用产生Java版的Response,转成PB版的返回
          service = constructor.newInstance(instance);
        } catch (InvocationTargetException e) {
          throw new YarnRuntimeException(e);
        } catch (IllegalAccessException e) {
          throw new YarnRuntimeException(e);
        } catch (InstantiationException e) {
          throw new YarnRuntimeException(e);
        }
    
        // service是ResourceTrackerPBServiceImpl的实例,实现 ResourceTrackerPB接口,因此pbProtocol就是ResourceTrackerPB
        Class<?> pbProtocol = service.getClass().getInterfaces()[0];
        Method method = protoCache.get(protocol);
        if (method == null) {
          Class<?> protoClazz = null;
          try {
            // 这里也是根据约定找到 ResourceTracker$ResourceTrackerService类,这个由PB生成
            protoClazz = localConf.getClassByName(getProtoClassName(protocol));
          } catch (ClassNotFoundException e) {
            throw new YarnRuntimeException("Failed to load class: ["
                + getProtoClassName(protocol) + "]", e);
          }
          try {
            method = protoClazz.getMethod("newReflectiveBlockingService",
                pbProtocol.getInterfaces()[0]);
            method.setAccessible(true);
            protoCache.putIfAbsent(protocol, method);
          } catch (NoSuchMethodException e) {
            throw new YarnRuntimeException(e);
          }
        }
        
        try {
          // method即 newReflectiveBlockingService 方法最终调用 ResourceTrackerPBServiceImpl相应的方法
          return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
              (BlockingService)method.invoke(null, service), portRangeConfig);
        } catch (InvocationTargetException e) {
          throw new YarnRuntimeException(e);
        } catch (IllegalAccessException e) {
          throw new YarnRuntimeException(e);
        } catch (IOException e) {
          throw new YarnRuntimeException(e);
        }
      }
    
    • ResourceTrackerService会启动一个RPC server,本身真正实现了服务端RPC请求的处理逻辑,在RPC server中注册自己。
    • ResourceTrackerPBServiceImpl是作为代理进行Java和PB实现的转换,实际是调用ResourceTrackerService真正的实现。Yarn RPC中Request和Response有两种,一种是PB定义的,真正序列化的时候使用;一种是Java定义的,对PB生成的进行封装,在编程的时候使用,因此需要进行转换
    • ResourceTracker$ResourceTrackerService是PB自动生成的类,最终是执行ResourceTrackerPBServiceImpl相应的方法

    相关文章

      网友评论

          本文标题:YarnRPC过程

          本文链接:https://www.haomeiwen.com/subject/efyhbxtx.html