美文网首页
SOFABolt 源码分析8 - RemotingCommand

SOFABolt 源码分析8 - RemotingCommand

作者: 原水寒 | 来源:发表于2018-10-06 13:54 被阅读124次

SOFABolt 对于请求和响应(实际上心跳也是)都会封装为 RemotingCommand 的实现类,然后在网络层进行传输。


image.png

如上图所示,整个 RemotingCommand 命令协议包括以下几部分:

  • RemotingCommand 命令结构的设计:定义 RemotingCommand 基本结构
  • CommandFactory 命令工厂的设计:创建 RemotingCommand 实例
  • CommandHandler 命令处理器调用者的设计: RemotingCommand 处理器调用者
  • ProcessorManager 命令处理器容器的设计:RemotingCommand 处理器的容器
  • RemotingProcessor 命令处理器的设计:真正的 RemotingCommand 处理器
  • UserProcessor 用户处理器的设计:用户自定义处理器

一、整体设计

整体使用了 命令模式

RemotingCommand 作为命令模式的 Command 角色
CommandHandler 作为命令模式的 Client 角色
ProcessorManager 作为命令模式的 Invoker 角色
RemotingProcessor 作为命令模式的 Receiver 角色

CommandHandler 从 ProcessorManager 中取出相应的处理器 RemotingProcessor 实例,对 RemotingCommand 进行处理。

二、RemotingCommand 命令结构的设计

image.png

说明:

序列化相关的在《序列化设计》中分析。

  • RemotingCommand 提供了命令定义接口,对于 rpc 框架,提供了一个 RpcCommand 子类;如果要提供消息队列功能,可以class MessageCommand implements RemotingCommand
  • you can define your own protocol code in ProtocolCode#version
  • RpcCommand 及其子类的属性与方法见图
  • ResponseStatus 提供了详细的返回码

三、CommandFactory 命令工厂的设计

image.png

说明:

CommandFactory 针对系统异常和业务异常分别作了不同的处理

系统响应异常:createTimeoutResponse、createSendFailedResponse、createConnectionClosedResponse
业务响应异常:createExceptionResponse
正常响应:createResponse
正常请求:createRequestCommand

正常响应

    public RpcResponseCommand createResponse(final Object responseObject,
                                             final RemotingCommand requestCmd) {
        RpcResponseCommand response = new RpcResponseCommand(requestCmd.getId(), responseObject);
        if (null != responseObject) {
            response.setResponseClass(responseObject.getClass().getName());
        } else {
            response.setResponseClass(null);
        }
        // 响应与请求用的是同一种序列化器
        response.setSerializer(requestCmd.getSerializer());
        response.setProtocolSwitch(requestCmd.getProtocolSwitch());
        response.setResponseStatus(ResponseStatus.SUCCESS);
        return response;
    }

超时异常响应

    public ResponseCommand createTimeoutResponse(InetSocketAddress address) {
        ResponseCommand responseCommand = new ResponseCommand();
        responseCommand.setResponseStatus(ResponseStatus.TIMEOUT);
        responseCommand.setResponseTimeMillis(System.currentTimeMillis());
        responseCommand.setResponseHost(address);
        return responseCommand;
    }

业务异常响应

    public RpcResponseCommand createExceptionResponse(int id, final Throwable t, String errMsg) {
        RpcResponseCommand response = null;
        if (null == t) {
            response = new RpcResponseCommand(id, createServerException(errMsg));
        } else {
            response = new RpcResponseCommand(id, createServerException(t, errMsg));
        }
        response.setResponseClass(RpcServerException.class.getName());
        response.setResponseStatus(ResponseStatus.SERVER_EXCEPTION);
        return response;
    }

    private RpcServerException createServerException(String errMsg) {
        return new RpcServerException(errMsg);
    }

    private RpcServerException createServerException(Throwable t, String errMsg) {
        String formattedErrMsg = String.format(
            "[Server]OriginErrorMsg: %s: %s. AdditionalErrorMsg: %s", t.getClass().getName(),
            t.getMessage(), errMsg);
        RpcServerException e = new RpcServerException(formattedErrMsg);
        // fill the stack trace using the stack trace of throwable
        e.setStackTrace(t.getStackTrace());
        return e;
    }

所以业务异常最终返回的 RpcResponseCommand#responseObject == RpcServerException

灵活处理异常(自己指定 ResponseStatus)

    public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status) {
        RpcResponseCommand responseCommand = new RpcResponseCommand();
        responseCommand.setId(id);
        responseCommand.setResponseStatus(status);
        return responseCommand;
    }

    public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status, Throwable t) {
        RpcResponseCommand responseCommand = this.createExceptionResponse(id, status);
        responseCommand.setResponseObject(createServerException(t, null));
        responseCommand.setResponseClass(RpcServerException.class.getName());
        return responseCommand;
    }

四、CommandHandler 命令处理器调用者与 ProcessorManager 命令处理器容器的设计

image.png

创建RpcCommandHandler实例

    public RpcCommandHandler(CommandFactory commandFactory) {
        this.commandFactory = commandFactory;
        // 创建ProcessorManager容器
        this.processorManager = new ProcessorManager();
        // 创建request处理器RpcRequestProcessor,并加入ProcessorManager容器
        this.processorManager.registerProcessor(RpcCommandCode.RPC_REQUEST, new RpcRequestProcessor(this.commandFactory));
        // 创建response处理器RpcResponseProcessor,并加入ProcessorManager容器
        this.processorManager.registerProcessor(RpcCommandCode.RPC_RESPONSE, new RpcResponseProcessor());
        // 创建heartbeat处理器RpcHeartBeatProcessor,并加入ProcessorManager容器
        this.processorManager.registerProcessor(CommonCommandCode.HEARTBEAT, new RpcHeartBeatProcessor());
        // 创建娄底处理器RpcHeartBeatProcessor,并加入ProcessorManager容器
        this.processorManager
            .registerDefaultProcessor(new AbstractRemotingProcessor<RemotingCommand>() {
                @Override
                public void doProcess(RemotingContext ctx, RemotingCommand msg) throws Exception {
                    logger.error("No processor available for command code {}, msgId {}", msg.getCmdCode(), msg.getId());
                }
            });
    }

注意:在创建RpcCommandHandler的时候,就已经添加了各种默认的 RemotingProcessor,这样也就导致了RemotingProcessor#executor 无法自己指定;可以通过 ProtocolManager.getProtocol(ProtocolCode.fromBytes(protocolCode)).getCommandHandler().registerProcessor(cmd, processor); 这样的方式来覆盖默认的 RemotingProcessor,进而间接实例化 RemotingProcessor#executor。

CommandHandler 处理请求

    private void handle(final RemotingContext ctx, final Object msg) {
        try {
            if (msg instanceof List) {
                final Runnable handleTask = new Runnable() {
                    @Override
                    public void run() {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Batch message! size={}", ((List<?>) msg).size());
                        }
                        for (final Object m : (List<?>) msg) {
                            RpcCommandHandler.this.process(ctx, m);
                        }
                    }
                };
                if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
                    // If msg is list ,then the batch submission to biz threadpool can save io thread.
                    // See com.alipay.remoting.decoder.ProtocolDecoder
                    processorManager.getDefaultExecutor().execute(handleTask);
                } else {
                    handleTask.run();
                }
            } else {
                process(ctx, msg);
            }
        } catch (final Throwable t) {
            processException(ctx, msg, t);
        }
    }

    private void process(RemotingContext ctx, Object msg) {
        try {
            final RpcCommand cmd = (RpcCommand) msg;
            // 从processorManager中获取相应的RemotingProcessor,之后交给该RemotingProcessor实例进行处理
            final RemotingProcessor processor = processorManager.getProcessor(cmd.getCmdCode());
            processor.process(ctx, cmd, processorManager.getDefaultExecutor());
        } catch (final Throwable t) {
            // 如果msg是请求信息 && 调用不是oneway模式 && 抛出RejectedExecutionException => 构造ResponseStatus.SERVER_THREADPOOL_BUSY响应,返回给客户端
            processException(ctx, msg, t);
        }
    }

msg 是 List 的情况在《编解码设计》部分分析。

最后,看一下 ProcessorManager。

    private ConcurrentHashMap<CommandCode, RemotingProcessor<?>> cmd2processors = new ConcurrentHashMap<CommandCode, RemotingProcessor<?>>(4); 
    private RemotingProcessor<?> defaultProcessor;
    /**
     * The default executor, if no executor is set for processor, this one will be used
     * processor包括:RemotingProcessor 和 UserProcessor
     */
    private ExecutorService defaultExecutor;
    // -Dbolt.tp.min, 默认20
    private int minPoolSize = ConfigManager.default_tp_min_size();
    // -Dbolt.tp.max, 默认400
    private int maxPoolSize = ConfigManager.default_tp_max_size();
    // -Dbolt.tp.queue, 默认600
    private int queueSize = ConfigManager.default_tp_queue_size();
    // -Dbolt.tp.keepalive, 默认60
    private long keepAliveTime = ConfigManager.default_tp_keepalive_time();

    public ProcessorManager() {
        defaultExecutor = new ThreadPoolExecutor(minPoolSize, 
                                                 maxPoolSize, 
                                                 keepAliveTime,
                                                 TimeUnit.SECONDS, 
                                                 new ArrayBlockingQueue<Runnable>(queueSize), 
                                                 new NamedThreadFactory("Bolt-default-executor", true));
    }

默认线程池的大小可以通过上述代码中的说明进行配置
疑问:cmd2processors 默认大小为4,为什么不是3?

五、RemotingProcessor 命令处理器的设计

image.png

心跳相关的在《心跳设计》中分析

AbstractRemotingProcessor 使用模板模式,提供消息处理模板 process 方法,其内调用真正的消息处理方法 doProcess,doProcess 🈶各个子类进行覆盖。
在各自的 doProcess 方法中又调用了 UserProcessor 用户自定义处理器。

RpcResponseProcessor 和 RpcHeartBeatProcessor 直接覆盖了AbstractRemotingProcessor#doProcess 方法
RpcRequestProcessor 则同时覆盖了 process 和 doProcess 两个方法。在其中还调用了 UserProcessor 做自定义的业务逻辑处理。

AbstractRemotingProcessor 核心模板方法

    /**
     * 提供模板方法
     * 对于RpcResponseProcessor和RpcHeartBeatProcessor,如果RemotingProcessor#executor为null,就使用ProcessorManager#defaultExecutor;
     * 对于RpcRequestProcessor,会经历通过UserProcessor#executorSelector选择线程池 -> UserProcessor#executor -> RemotingProcessor#executor -> ProcessorManager#defaultExecutor
     */
    @Override
    public void process(RemotingContext ctx, T msg, ExecutorService defaultExecutor)
                                                                                    throws Exception {
        ProcessTask task = new ProcessTask(ctx, msg);
        if (this.getExecutor() != null) {
            this.getExecutor().execute(task);
        } else {
            defaultExecutor.execute(task);
        }
    }

    public abstract void doProcess(RemotingContext ctx, T msg) throws Exception;

    class ProcessTask implements Runnable {
        RemotingContext ctx;
        T               msg;

        public ProcessTask(RemotingContext ctx, T msg) {
            this.ctx = ctx;
            this.msg = msg;
        }

        public void run() {
            try {
                AbstractRemotingProcessor.this.doProcess(ctx, msg);
            } catch (Throwable e) {
                //protect the thread running this task
                ...
            }
        }
    }

响应处理器 RpcResponseProcessor 轮廓

    public void doProcess(RemotingContext ctx, RemotingCommand cmd) {
        // 获取连接及相应的 InvokeFuture 实例
        Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
        InvokeFuture future = conn.removeInvokeFuture(cmd.getId());
        // 设置响应 + 唤醒阻塞线程
        future.putResponse(cmd);
        // 取消超时任务 TimerTask
        future.cancelTimeout();        
        // 执行回调操作
        future.executeInvokeCallback();      
    }

简单的看一下 DefaultInvokeFuture

    private int                      invokeId; // 消息唯一id
    private InvokeCallbackListener   callbackListener; // 回调监听器
    private InvokeCallback           callback; // 实际的回调函数
    private volatile ResponseCommand responseCommand; // 最终响应
    private final CountDownLatch     countDownLatch = new CountDownLatch(1); // 阻塞器
    private Timeout                  timeout; // netty超时执行器
    private Throwable                cause; // 异常
    private byte                     protocol; // 私有协议
    private InvokeContext            invokeContext;
    private CommandFactory           commandFactory;

    // 阻塞等待
    public ResponseCommand waitResponse(long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

    // 阻塞等待
    public ResponseCommand waitResponse() throws InterruptedException {
        this.countDownLatch.await();
        return this.responseCommand;
    }

    // 设置响应 + 唤醒等待
    public void putResponse(RemotingCommand response) {
        this.responseCommand = (ResponseCommand) response;
        this.countDownLatch.countDown();
    }

    // 判断是否已经不再阻塞
    public boolean isDone() {
        return this.countDownLatch.getCount() <= 0;
    }

    // 执行回调
    public void executeInvokeCallback() {
        if (callbackListener != null) {
            if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
                callbackListener.onResponse(this);
            }
        }
    }

    // 取消超时
    public void cancelTimeout() {
        if (this.timeout != null) {
            this.timeout.cancel();
        }
    }

再看一下 RpcInvokeCallbackListener 回调:

public class RpcInvokeCallbackListener implements InvokeCallbackListener {

    ...

    @Override
    public void onResponse(InvokeFuture future) {
        InvokeCallback callback = future.getInvokeCallback();
        CallbackTask task = new CallbackTask(this.getRemoteAddress(), future);
        if (callback.getExecutor() != null) {
            callback.getExecutor().execute(task);  
        } else {
            task.run();
        }
    }

    class CallbackTask implements Runnable {
        InvokeFuture future;
        
        ...
        
        @Override
        public void run() {
            InvokeCallback callback = future.getInvokeCallback();
            ResponseCommand response = (ResponseCommand) future.waitResponse(0);
            if (response == null || response.getResponseStatus() != ResponseStatus.SUCCESS) {
                ...
                callback.onException(e);
            } else {
                ...
                RpcResponseCommand rpcResponse = (RpcResponseCommand) response;
                response.deserialize();
                callback.onResponse(rpcResponse.getResponseObject());
            } // enf of else
        } // end of run
    }

请求处理器 RpcRequestProcessor

    public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) throws Exception {
        // 反序列化clazz + 根据clazz获取UserProcessor 
        if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
            return;
        }
        UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
        if (userProcessor == null) {
            String errMsg = "No user processor found for request: " + cmd.getRequestClass();
            sendResponseIfNecessary(... errMsg));
            return;// must end process
        }

        // set timeout check state from user's processor
        ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());

        // 如果指定在IO线程处理请求,则直接反序列化全部,创建ProcessTask,直接执行
        if (userProcessor.processInIOThread()) {
            if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
                return;
            }
            // process in io thread
            new ProcessTask(ctx, cmd).run();
            return;// end
        }

        // 如果指定不是在IO线程处理请求,则先获取线程池,创建ProcessTask,在新的线程池执行
        // 线程池的选择:userProcessor.executorSelector -> userProcessor.executor -> RemotingProcessor.executor -> ProcessorManager.defaultExecutor
        Executor executor;
        ... 
        // use the final executor dispatch process task
        executor.execute(new ProcessTask(ctx, cmd));
    }

    class ProcessTask implements Runnable {
        RemotingContext   ctx;
        RpcRequestCommand msg;

        public ProcessTask(RemotingContext ctx, RpcRequestCommand msg) {
            this.ctx = ctx;
            this.msg = msg;
        }

        public void run() {
            RpcRequestProcessor.this.doProcess(ctx, msg);
        }
    }

    public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
        ...
        if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
            return;
        }
        dispatchToUserProcessor(ctx, cmd);
    }

    private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
        final int id = cmd.getId();
        final byte type = cmd.getType();
        UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
        // 异步自定义处理器
        if (processor instanceof AsyncUserProcessor) {
            processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
            ...
        } else {  // 同步自定义处理器
            Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
            sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
            ...
        }
    }

    public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
                                        final RemotingCommand response) {
        final int id = response.getId();
        if (type != RpcCommandType.REQUEST_ONEWAY) {
            RemotingCommand serializedResponse = response;
            response.serialize();

            ctx.writeAndFlush(serializedResponse);
        } 
    }

处理比较简单,关于 UserProcessor 的放在《用户自定义处理器设计》中分析。

相关文章

网友评论

      本文标题:SOFABolt 源码分析8 - RemotingCommand

      本文链接:https://www.haomeiwen.com/subject/lvocaftx.html