SOFABolt 对于请求和响应(实际上心跳也是)都会封装为 RemotingCommand 的实现类,然后在网络层进行传输。
image.png
如上图所示,整个 RemotingCommand 命令协议包括以下几部分:
- RemotingCommand 命令结构的设计:定义 RemotingCommand 基本结构
- CommandFactory 命令工厂的设计:创建 RemotingCommand 实例
- CommandHandler 命令处理器调用者的设计: RemotingCommand 处理器调用者
- ProcessorManager 命令处理器容器的设计:RemotingCommand 处理器的容器
- RemotingProcessor 命令处理器的设计:真正的 RemotingCommand 处理器
- UserProcessor 用户处理器的设计:用户自定义处理器
一、整体设计
整体使用了 命令模式:
RemotingCommand 作为命令模式的 Command 角色
CommandHandler 作为命令模式的 Client 角色
ProcessorManager 作为命令模式的 Invoker 角色
RemotingProcessor 作为命令模式的 Receiver 角色
CommandHandler 从 ProcessorManager 中取出相应的处理器 RemotingProcessor 实例,对 RemotingCommand 进行处理。
二、RemotingCommand 命令结构的设计
data:image/s3,"s3://crabby-images/d2cba/d2cbadd19b5a576cfc3e833ed2533954c3c8c502" alt=""
说明:
序列化相关的在《序列化设计》中分析。
- RemotingCommand 提供了命令定义接口,对于 rpc 框架,提供了一个 RpcCommand 子类;如果要提供消息队列功能,可以
class MessageCommand implements RemotingCommand
;- you can define your own protocol code in ProtocolCode#version
- RpcCommand 及其子类的属性与方法见图
- ResponseStatus 提供了详细的返回码
三、CommandFactory 命令工厂的设计
data:image/s3,"s3://crabby-images/5b109/5b109ce6c42a92e39eef743e238c9313132b4fdd" alt=""
说明:
CommandFactory 针对系统异常和业务异常分别作了不同的处理
系统响应异常:createTimeoutResponse、createSendFailedResponse、createConnectionClosedResponse
业务响应异常:createExceptionResponse
正常响应:createResponse
正常请求:createRequestCommand
正常响应
public RpcResponseCommand createResponse(final Object responseObject,
final RemotingCommand requestCmd) {
RpcResponseCommand response = new RpcResponseCommand(requestCmd.getId(), responseObject);
if (null != responseObject) {
response.setResponseClass(responseObject.getClass().getName());
} else {
response.setResponseClass(null);
}
// 响应与请求用的是同一种序列化器
response.setSerializer(requestCmd.getSerializer());
response.setProtocolSwitch(requestCmd.getProtocolSwitch());
response.setResponseStatus(ResponseStatus.SUCCESS);
return response;
}
超时异常响应
public ResponseCommand createTimeoutResponse(InetSocketAddress address) {
ResponseCommand responseCommand = new ResponseCommand();
responseCommand.setResponseStatus(ResponseStatus.TIMEOUT);
responseCommand.setResponseTimeMillis(System.currentTimeMillis());
responseCommand.setResponseHost(address);
return responseCommand;
}
业务异常响应
public RpcResponseCommand createExceptionResponse(int id, final Throwable t, String errMsg) {
RpcResponseCommand response = null;
if (null == t) {
response = new RpcResponseCommand(id, createServerException(errMsg));
} else {
response = new RpcResponseCommand(id, createServerException(t, errMsg));
}
response.setResponseClass(RpcServerException.class.getName());
response.setResponseStatus(ResponseStatus.SERVER_EXCEPTION);
return response;
}
private RpcServerException createServerException(String errMsg) {
return new RpcServerException(errMsg);
}
private RpcServerException createServerException(Throwable t, String errMsg) {
String formattedErrMsg = String.format(
"[Server]OriginErrorMsg: %s: %s. AdditionalErrorMsg: %s", t.getClass().getName(),
t.getMessage(), errMsg);
RpcServerException e = new RpcServerException(formattedErrMsg);
// fill the stack trace using the stack trace of throwable
e.setStackTrace(t.getStackTrace());
return e;
}
所以业务异常最终返回的 RpcResponseCommand#responseObject == RpcServerException
灵活处理异常(自己指定 ResponseStatus)
public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status) {
RpcResponseCommand responseCommand = new RpcResponseCommand();
responseCommand.setId(id);
responseCommand.setResponseStatus(status);
return responseCommand;
}
public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status, Throwable t) {
RpcResponseCommand responseCommand = this.createExceptionResponse(id, status);
responseCommand.setResponseObject(createServerException(t, null));
responseCommand.setResponseClass(RpcServerException.class.getName());
return responseCommand;
}
四、CommandHandler 命令处理器调用者与 ProcessorManager 命令处理器容器的设计
data:image/s3,"s3://crabby-images/d52ec/d52ecda05c32727f38fdbfc81be5ab63bdf099e3" alt=""
创建RpcCommandHandler实例
public RpcCommandHandler(CommandFactory commandFactory) {
this.commandFactory = commandFactory;
// 创建ProcessorManager容器
this.processorManager = new ProcessorManager();
// 创建request处理器RpcRequestProcessor,并加入ProcessorManager容器
this.processorManager.registerProcessor(RpcCommandCode.RPC_REQUEST, new RpcRequestProcessor(this.commandFactory));
// 创建response处理器RpcResponseProcessor,并加入ProcessorManager容器
this.processorManager.registerProcessor(RpcCommandCode.RPC_RESPONSE, new RpcResponseProcessor());
// 创建heartbeat处理器RpcHeartBeatProcessor,并加入ProcessorManager容器
this.processorManager.registerProcessor(CommonCommandCode.HEARTBEAT, new RpcHeartBeatProcessor());
// 创建娄底处理器RpcHeartBeatProcessor,并加入ProcessorManager容器
this.processorManager
.registerDefaultProcessor(new AbstractRemotingProcessor<RemotingCommand>() {
@Override
public void doProcess(RemotingContext ctx, RemotingCommand msg) throws Exception {
logger.error("No processor available for command code {}, msgId {}", msg.getCmdCode(), msg.getId());
}
});
}
注意:在创建RpcCommandHandler的时候,就已经添加了各种默认的 RemotingProcessor,这样也就导致了RemotingProcessor#executor 无法自己指定;可以通过
ProtocolManager.getProtocol(ProtocolCode.fromBytes(protocolCode)).getCommandHandler().registerProcessor(cmd, processor);
这样的方式来覆盖默认的 RemotingProcessor,进而间接实例化 RemotingProcessor#executor。
CommandHandler 处理请求
private void handle(final RemotingContext ctx, final Object msg) {
try {
if (msg instanceof List) {
final Runnable handleTask = new Runnable() {
@Override
public void run() {
if (logger.isDebugEnabled()) {
logger.debug("Batch message! size={}", ((List<?>) msg).size());
}
for (final Object m : (List<?>) msg) {
RpcCommandHandler.this.process(ctx, m);
}
}
};
if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
// If msg is list ,then the batch submission to biz threadpool can save io thread.
// See com.alipay.remoting.decoder.ProtocolDecoder
processorManager.getDefaultExecutor().execute(handleTask);
} else {
handleTask.run();
}
} else {
process(ctx, msg);
}
} catch (final Throwable t) {
processException(ctx, msg, t);
}
}
private void process(RemotingContext ctx, Object msg) {
try {
final RpcCommand cmd = (RpcCommand) msg;
// 从processorManager中获取相应的RemotingProcessor,之后交给该RemotingProcessor实例进行处理
final RemotingProcessor processor = processorManager.getProcessor(cmd.getCmdCode());
processor.process(ctx, cmd, processorManager.getDefaultExecutor());
} catch (final Throwable t) {
// 如果msg是请求信息 && 调用不是oneway模式 && 抛出RejectedExecutionException => 构造ResponseStatus.SERVER_THREADPOOL_BUSY响应,返回给客户端
processException(ctx, msg, t);
}
}
msg 是 List 的情况在《编解码设计》部分分析。
最后,看一下 ProcessorManager。
private ConcurrentHashMap<CommandCode, RemotingProcessor<?>> cmd2processors = new ConcurrentHashMap<CommandCode, RemotingProcessor<?>>(4);
private RemotingProcessor<?> defaultProcessor;
/**
* The default executor, if no executor is set for processor, this one will be used
* processor包括:RemotingProcessor 和 UserProcessor
*/
private ExecutorService defaultExecutor;
// -Dbolt.tp.min, 默认20
private int minPoolSize = ConfigManager.default_tp_min_size();
// -Dbolt.tp.max, 默认400
private int maxPoolSize = ConfigManager.default_tp_max_size();
// -Dbolt.tp.queue, 默认600
private int queueSize = ConfigManager.default_tp_queue_size();
// -Dbolt.tp.keepalive, 默认60
private long keepAliveTime = ConfigManager.default_tp_keepalive_time();
public ProcessorManager() {
defaultExecutor = new ThreadPoolExecutor(minPoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize),
new NamedThreadFactory("Bolt-default-executor", true));
}
默认线程池的大小可以通过上述代码中的说明进行配置
疑问
:cmd2processors 默认大小为4,为什么不是3?
五、RemotingProcessor 命令处理器的设计
data:image/s3,"s3://crabby-images/636c0/636c0bf04bc15d6cf934e929cfd45710475ceaf3" alt=""
心跳相关的在《心跳设计》中分析
AbstractRemotingProcessor 使用模板模式,提供消息处理模板 process 方法,其内调用真正的消息处理方法 doProcess,doProcess 🈶各个子类进行覆盖。
在各自的 doProcess 方法中又调用了 UserProcessor 用户自定义处理器。RpcResponseProcessor 和 RpcHeartBeatProcessor 直接覆盖了AbstractRemotingProcessor#doProcess 方法
RpcRequestProcessor 则同时覆盖了 process 和 doProcess 两个方法。在其中还调用了 UserProcessor 做自定义的业务逻辑处理。
AbstractRemotingProcessor 核心模板方法
/**
* 提供模板方法
* 对于RpcResponseProcessor和RpcHeartBeatProcessor,如果RemotingProcessor#executor为null,就使用ProcessorManager#defaultExecutor;
* 对于RpcRequestProcessor,会经历通过UserProcessor#executorSelector选择线程池 -> UserProcessor#executor -> RemotingProcessor#executor -> ProcessorManager#defaultExecutor
*/
@Override
public void process(RemotingContext ctx, T msg, ExecutorService defaultExecutor)
throws Exception {
ProcessTask task = new ProcessTask(ctx, msg);
if (this.getExecutor() != null) {
this.getExecutor().execute(task);
} else {
defaultExecutor.execute(task);
}
}
public abstract void doProcess(RemotingContext ctx, T msg) throws Exception;
class ProcessTask implements Runnable {
RemotingContext ctx;
T msg;
public ProcessTask(RemotingContext ctx, T msg) {
this.ctx = ctx;
this.msg = msg;
}
public void run() {
try {
AbstractRemotingProcessor.this.doProcess(ctx, msg);
} catch (Throwable e) {
//protect the thread running this task
...
}
}
}
响应处理器 RpcResponseProcessor 轮廓
public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
// 获取连接及相应的 InvokeFuture 实例
Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
// 设置响应 + 唤醒阻塞线程
future.putResponse(cmd);
// 取消超时任务 TimerTask
future.cancelTimeout();
// 执行回调操作
future.executeInvokeCallback();
}
简单的看一下 DefaultInvokeFuture
private int invokeId; // 消息唯一id
private InvokeCallbackListener callbackListener; // 回调监听器
private InvokeCallback callback; // 实际的回调函数
private volatile ResponseCommand responseCommand; // 最终响应
private final CountDownLatch countDownLatch = new CountDownLatch(1); // 阻塞器
private Timeout timeout; // netty超时执行器
private Throwable cause; // 异常
private byte protocol; // 私有协议
private InvokeContext invokeContext;
private CommandFactory commandFactory;
// 阻塞等待
public ResponseCommand waitResponse(long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
// 阻塞等待
public ResponseCommand waitResponse() throws InterruptedException {
this.countDownLatch.await();
return this.responseCommand;
}
// 设置响应 + 唤醒等待
public void putResponse(RemotingCommand response) {
this.responseCommand = (ResponseCommand) response;
this.countDownLatch.countDown();
}
// 判断是否已经不再阻塞
public boolean isDone() {
return this.countDownLatch.getCount() <= 0;
}
// 执行回调
public void executeInvokeCallback() {
if (callbackListener != null) {
if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
callbackListener.onResponse(this);
}
}
}
// 取消超时
public void cancelTimeout() {
if (this.timeout != null) {
this.timeout.cancel();
}
}
再看一下 RpcInvokeCallbackListener 回调:
public class RpcInvokeCallbackListener implements InvokeCallbackListener {
...
@Override
public void onResponse(InvokeFuture future) {
InvokeCallback callback = future.getInvokeCallback();
CallbackTask task = new CallbackTask(this.getRemoteAddress(), future);
if (callback.getExecutor() != null) {
callback.getExecutor().execute(task);
} else {
task.run();
}
}
class CallbackTask implements Runnable {
InvokeFuture future;
...
@Override
public void run() {
InvokeCallback callback = future.getInvokeCallback();
ResponseCommand response = (ResponseCommand) future.waitResponse(0);
if (response == null || response.getResponseStatus() != ResponseStatus.SUCCESS) {
...
callback.onException(e);
} else {
...
RpcResponseCommand rpcResponse = (RpcResponseCommand) response;
response.deserialize();
callback.onResponse(rpcResponse.getResponseObject());
} // enf of else
} // end of run
}
请求处理器 RpcRequestProcessor
public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) throws Exception {
// 反序列化clazz + 根据clazz获取UserProcessor
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
return;
}
UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
if (userProcessor == null) {
String errMsg = "No user processor found for request: " + cmd.getRequestClass();
sendResponseIfNecessary(... errMsg));
return;// must end process
}
// set timeout check state from user's processor
ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());
// 如果指定在IO线程处理请求,则直接反序列化全部,创建ProcessTask,直接执行
if (userProcessor.processInIOThread()) {
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
return;
}
// process in io thread
new ProcessTask(ctx, cmd).run();
return;// end
}
// 如果指定不是在IO线程处理请求,则先获取线程池,创建ProcessTask,在新的线程池执行
// 线程池的选择:userProcessor.executorSelector -> userProcessor.executor -> RemotingProcessor.executor -> ProcessorManager.defaultExecutor
Executor executor;
...
// use the final executor dispatch process task
executor.execute(new ProcessTask(ctx, cmd));
}
class ProcessTask implements Runnable {
RemotingContext ctx;
RpcRequestCommand msg;
public ProcessTask(RemotingContext ctx, RpcRequestCommand msg) {
this.ctx = ctx;
this.msg = msg;
}
public void run() {
RpcRequestProcessor.this.doProcess(ctx, msg);
}
}
public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
...
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
return;
}
dispatchToUserProcessor(ctx, cmd);
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
final int id = cmd.getId();
final byte type = cmd.getType();
UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
// 异步自定义处理器
if (processor instanceof AsyncUserProcessor) {
processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
...
} else { // 同步自定义处理器
Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
...
}
}
public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
final RemotingCommand response) {
final int id = response.getId();
if (type != RpcCommandType.REQUEST_ONEWAY) {
RemotingCommand serializedResponse = response;
response.serialize();
ctx.writeAndFlush(serializedResponse);
}
}
处理比较简单,关于 UserProcessor 的放在《用户自定义处理器设计》中分析。
网友评论