最近在看《深入解析YARN架构设计与实现原理》,看到第三章YRAN RPC实现的时候对其中如果使用ProtoBuf不是很理解,尤其是里面同一个类会有Java定义和ProtoBuf两个实现,两者的交互细节书上写的不是很清楚,这里根据自己的理解简单记录下。这里以server端为例。
Server的创建在是通过RpcServerFactoryPBImpl.getServer
方法,这里protocol参数以ResourceTracker
为例(在ResourceTrackerService.serviceStart
中),instance
参数就是ResourceTrackerService
本身,也就是真正实现了rpc调用服务端方法的类。最终实现方法是:
public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
String portRangeConfig) {
Constructor<?> constructor = serviceCache.get(protocol);
if (constructor == null) {
Class<?> pbServiceImplClazz = null;
try {
// getPbServiceImplClassName 根据约定的规则在 PB_IMPL_PACKAGE_SUFFIX 找到 ResourceTrackerPBServiceImpl 类
pbServiceImplClazz = localConf
.getClassByName(getPbServiceImplClassName(protocol));
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Failed to load class: ["
+ getPbServiceImplClassName(protocol) + "]", e);
}
try {
constructor = pbServiceImplClazz.getConstructor(protocol);
constructor.setAccessible(true);
serviceCache.putIfAbsent(protocol, constructor);
} catch (NoSuchMethodException e) {
throw new YarnRuntimeException("Could not find constructor with params: "
+ Long.TYPE + ", " + InetSocketAddress.class + ", "
+ Configuration.class, e);
}
}
Object service = null;
try {
// 使用ResourceTrackerService实例去构造ResourceTrackerPBServiceImpl,
// ResourceTrackerPBServiceImpl的真正功能只是将PB版的请求(比如NodeHeartbeatRequestProto)封装成Java版的请求(对应的是NodeHeartbeatRequestPBImpl),
// 再传给ResourceTrackerService实际调用产生Java版的Response,转成PB版的返回
service = constructor.newInstance(instance);
} catch (InvocationTargetException e) {
throw new YarnRuntimeException(e);
} catch (IllegalAccessException e) {
throw new YarnRuntimeException(e);
} catch (InstantiationException e) {
throw new YarnRuntimeException(e);
}
// service是ResourceTrackerPBServiceImpl的实例,实现 ResourceTrackerPB接口,因此pbProtocol就是ResourceTrackerPB
Class<?> pbProtocol = service.getClass().getInterfaces()[0];
Method method = protoCache.get(protocol);
if (method == null) {
Class<?> protoClazz = null;
try {
// 这里也是根据约定找到 ResourceTracker$ResourceTrackerService类,这个由PB生成
protoClazz = localConf.getClassByName(getProtoClassName(protocol));
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Failed to load class: ["
+ getProtoClassName(protocol) + "]", e);
}
try {
method = protoClazz.getMethod("newReflectiveBlockingService",
pbProtocol.getInterfaces()[0]);
method.setAccessible(true);
protoCache.putIfAbsent(protocol, method);
} catch (NoSuchMethodException e) {
throw new YarnRuntimeException(e);
}
}
try {
// method即 newReflectiveBlockingService 方法最终调用 ResourceTrackerPBServiceImpl相应的方法
return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
(BlockingService)method.invoke(null, service), portRangeConfig);
} catch (InvocationTargetException e) {
throw new YarnRuntimeException(e);
} catch (IllegalAccessException e) {
throw new YarnRuntimeException(e);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
-
ResourceTrackerService
会启动一个RPC server,本身真正实现了服务端RPC请求的处理逻辑,在RPC server中注册自己。 -
ResourceTrackerPBServiceImpl
是作为代理进行Java和PB实现的转换,实际是调用ResourceTrackerService
真正的实现。Yarn RPC中Request和Response有两种,一种是PB定义的,真正序列化的时候使用;一种是Java定义的,对PB生成的进行封装,在编程的时候使用,因此需要进行转换 -
ResourceTracker$ResourceTrackerService
是PB自动生成的类,最终是执行ResourceTrackerPBServiceImpl
相应的方法
网友评论