美文网首页sofabolt
SOFABolt 源码分析16 - 上下文机制的设计

SOFABolt 源码分析16 - 上下文机制的设计

作者: 原水寒 | 来源:发表于2018-10-19 00:09 被阅读100次

SOFABolt 中存在四种上下文 context

  • InvokeContext:调用上下文,用于端内隐式传参,并可以通过自定义序列化器将 InvokeContext 内存储的参数自定义的序列化传递给对端(注意:InvokeContext 本身是不会传递给对端的)
  • RemotingContext:Remoting 层的上下文,程序内部使用
  • BizContext:业务上下文,提供给用户程序使用,封装了 RemotingContext,防止直接将 RemotingContext 暴露给用户
  • AsyncContext:存储存根信息,用于 AsyncUserProcessor 异步返回响应

一、InvokeContext

1.1 使用姿势

SOFABolt 源码分析10 - 精细的线程模型的设计 的 “2.4 设置 UserProcessor 自定义线程池选择器”

1.2 源码分析

public class InvokeContext {
    // ~~~ invoke context keys of client side
    public final static String                CLIENT_LOCAL_IP        = "bolt.client.local.ip";
    public final static String                CLIENT_LOCAL_PORT      = "bolt.client.local.port";
    public final static String                CLIENT_REMOTE_IP       = "bolt.client.remote.ip";
    public final static String                CLIENT_REMOTE_PORT     = "bolt.client.remote.port";
    /** time consumed during connection creating, this is a timespan */
    public final static String                CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";

    // ~~~ invoke context keys of server side
    public final static String                SERVER_LOCAL_IP        = "bolt.server.local.ip";
    public final static String                SERVER_LOCAL_PORT      = "bolt.server.local.port";
    public final static String                SERVER_REMOTE_IP       = "bolt.server.remote.ip";
    public final static String                SERVER_REMOTE_PORT     = "bolt.server.remote.port";

    // ~~~ invoke context keys of client and server side
    public final static String                BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
    /** 时间段:请求达到解码器 ~ 请求即将被处理(与处理完成) */
    public final static String                BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
    public final static String                BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
    public final static String                BOLT_CRC_SWITCH        = "bolt.invoke.crc.switch";

    public final static int                   INITIAL_SIZE           = 8;

    /** context */
    private ConcurrentHashMap<String, Object> context;

    public InvokeContext() {
        this.context = new ConcurrentHashMap<String, Object>(INITIAL_SIZE);
    }
}

注意:

  • InvokeContext 内部实际上就是一个 map,而不是像 RpcInvokeContext 一样,内部是一个 ThreadLocal(RpcInvokeContext 是 SOFARPC 的上下文)
  • 由于 InvokeContext 内部只是一个 map,所以 InvokeContext 本身不能进行“隐式传递”,InvokeContext 本身需要作为接口的参数进行传递才行,所以四种调用模式的三种调用链路方式都提供了带有 InvokeContext 参数的方法,eg. invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis)
  • InvokeContext 不会传递给对端,但是其中的内容可以通过自定义序列化器的方式传递给对端,使用姿势见 SOFABolt 源码分析10 - 精细的线程模型的设计

设置建连消耗时间(仅客户端,服务端不建连)

    public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
        final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
        this.connectionManager.check(conn);
        return this.invokeSync(conn, request, invokeContext, timeoutMillis);
    }

    protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
        // 记录开始时间
        long start = System.currentTimeMillis();
        Connection conn;
        try {
            // 建连
            conn = this.connectionManager.getAndCreateIfAbsent(url);
        } finally {
            if (null != invokeContext) {
                // 记录建连时间(客户端,服务端不建连)
                invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
            }
        }
        return conn;
    }

设置客户端自定义序列化器 + crc 开关 + 四要素(ip/port)+ requestID

    public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 创建请求:会根据 invokeContext 配置 - 设置客户端自定义序列化器 + crc 开关
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        // 预处理 InvokeContext:设置四要素(IP/PORT) + 请求ID
        preProcessInvokeContext(invokeContext, requestCommand, conn);
        // 发起请求
        ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
        // 将 invokeContext 设置到 responseCommand,可以让用户使用
        responseCommand.setInvokeContext(invokeContext);

        Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand,
            RemotingUtil.parseRemoteAddress(conn.getChannel()));
        return responseObject;
    }

    protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
        RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);

        if (null != invokeContext) {
            // 设置客户端自定义序列化器
            Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
            if (null != clientCustomSerializer) {
                    command.setSerializer((Byte) clientCustomSerializer);
            }

            // 是否开启 crc 开关
            // enable crc by default, user can disable by set invoke context `false` for key `InvokeContext.BOLT_CRC_SWITCH`
            Boolean crcSwitch = invokeContext.get(InvokeContext.BOLT_CRC_SWITCH, ProtocolSwitch.CRC_SWITCH_DEFAULT_VALUE);
            if (null != crcSwitch && crcSwitch) {
                command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
            }
        } else {
            // enable crc by default, if there is no invoke context.
            command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
        }
        command.setTimeout(timeoutMillis);
        command.setRequestClass(request.getClass().getName());
        // 设置 invokeContext 到 RpcRequestCommand 中,后续在自定义序列化器的序列化 header 和 body 的过程中,
        // 可以自定义的从 invokeContext 中序列化信息到对端
        command.setInvokeContext(invokeContext);
        command.serialize();
        logDebugInfo(command);
        return command;
    }

============================= RpcClientRemoting =============================
    protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
        if (null != invokeContext) {
            // 设置四要素
            invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
            // 设置 requestID
            invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
        }
    }

============================= RpcServerRemoting =============================
    protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
        if (null != invokeContext) {
            // 设置四要素
            invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
            // 设置 requestID
            invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
        }
    }

关于序列化的东西,在《序列化设计》部分分析。

统计“消息到达解码器 ~ 消息即将被业务逻辑处理器处理” 之间的时间

============================= RpcRequestProcessor =============================
    public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
        long currentTimestamp = System.currentTimeMillis();
        // 预处理 RemotingContext
        preProcessRemotingContext(ctx, cmd, currentTimestamp);
        ...
        dispatchToUserProcessor(ctx, cmd);
    }

    private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd,
                                           long currentTimestamp) {
        ...
        ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME, currentTimestamp - cmd.getArriveTime());
    }

BizContext 获取 InvokeContext

public class DefaultBizContext implements BizContext {
    private RemotingContext remotingCtx;
    public InvokeContext getInvokeContext() {
        return this.remotingCtx.getInvokeContext();
    }
}

二、RemotingContext

/**
 * Wrap the ChannelHandlerContext.
 */
public class RemotingContext {
    // netty ChannelHandlerContext
    private ChannelHandlerContext                       channelContext;
    // 是否是服务端
    private boolean                                     serverSide     = false;
    /** whether need handle request timeout, if true, request will be discarded. The default value is true */
    private boolean                                     timeoutDiscard = true;
    /** request arrive time stamp */
    private long                                        arriveTimestamp;
    /** request timeout setting by invoke side */
    private int                                         timeout;
    /** rpc command type:REQUEST / RESPONSE / REQUEST_ONEWAY */
    private int                                         rpcCommandType;
    // 用户业务逻辑处理器
    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors;
    // 调用上下文,主要会统计“消息到达解码器 ~ 消息即将被业务逻辑处理器处理” 之间的时间
    private InvokeContext                               invokeContext;

    public ChannelFuture writeAndFlush(RemotingCommand msg) {
        return this.channelContext.writeAndFlush(msg);
    }

    // whether this request already timeout: oneway 没有请求超时的概念
    public boolean isRequestTimeout() {
        if (this.timeout > 0 && (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
            && (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
            return true;
        }
        return false;
    }

    public UserProcessor<?> getUserProcessor(String className) {
        return StringUtils.isBlank(className) ? null : this.userProcessors.get(className);
    }

    public Connection getConnection() {
        return ConnectionUtil.getConnectionFromChannel(channelContext.channel());
    }
}

================== ConnectionUtil ==================
public class ConnectionUtil {
    public static Connection getConnectionFromChannel(Channel channel) {
         // 从 channel 的附属属性中获取 Connection
        Attribute<Connection> connAttr = channel.attr(Connection.CONNECTION);
        Connection connection = connAttr.get();
        return connection;
    }
}

RemotingContext 作用:

  • 包含 userProcessors 映射:用于在处理消息流程中选择业务逻辑处理
  • 包装 ChannelHandlerContext:用于在处理消息结束后或者异常后向对端发送消息
  • 包装 InvokeContext:用于存放添加服务端链路调用上下文

注意:BizContext 会包含 RemotingContext,但是不会提供 public 的 getRemotingContext 方法,但是会提供 getInvokeContext 方法。

使用链路

================== RpcHandler ==================
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ...
        protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
    }

================== RpcRequestProcessor ==================
    public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
        // 从 RemotingContext 获取 UserProcessor
        UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
        // set timeout check state from user's processor
        ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());

        // use the final executor dispatch process task
        executor.execute(new ProcessTask(ctx, cmd));
    }

    private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
        // 从 RemotingContext 获取 UserProcessor
        UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
        if (processor instanceof AsyncUserProcessor) {
            // processor.preHandleRequest:使用 BizContext 包装 RemotingContext,避免 RemotingContext 直接暴露给用户(因为 RemotingContext 包含 ChannelHandlerContext,可直接发送消息给对端)
            // 创建 RpcAsyncContext 存根:包装 RemotingContext,内部使用其做一步发送操作
            processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
        } else {
            // processor.preHandleRequest:使用 BizContext 包装 RemotingContext
            Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
            // 使用 ctx.writeAndFlush(serializedResponse) 发送响应
            sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
        }
    }

三、BizContext

使用姿势见 SOFABolt 源码分析15 - 双工通信机制的设计 中的“1.1、基于 addr 链路模式”

public class DefaultBizContext implements BizContext {
    // 包裹 RemotingContext
    private RemotingContext remotingCtx;

    // protect 方式,只有其子类可以访问
    protected RemotingContext getRemotingCtx() {
        return this.remotingCtx;
    }

    @Override
    public String getRemoteAddress() {
        if (null != this.remotingCtx) {
            ChannelHandlerContext channelCtx = this.remotingCtx.getChannelContext();
            Channel channel = channelCtx.channel();
            if (null != channel) {
                return RemotingUtil.parseRemoteAddress(channel);
            }
        }
        return "UNKNOWN_ADDRESS";
    }
    
    ...
  
    // 这里也存储了 Connection,可以用于服务端向客户端直接发起调用
    @Override
    public Connection getConnection() {
        if (null != this.remotingCtx) {
            return this.remotingCtx.getConnection();
        }
        return null;
    }

    @Override
    public boolean isRequestTimeout() {
        return this.remotingCtx.isRequestTimeout();
    }
    
    ...
 
    @Override
    public InvokeContext getInvokeContext() {
        return this.remotingCtx.getInvokeContext();
    }
}

BizContext 是直接给用户程序使用的,而 RemotingContext 是程序内部使用的

    public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
        return new DefaultBizContext(remotingCtx);
    }

四、AsyncContext

public class RpcAsyncContext implements AsyncContext {
    /** remoting context */
    private RemotingContext     ctx;
    // rpc request command: 
    // 1. 会根据请求中的 type 是否是 oneway 来决定是否向对端发送数据
    // 2. 会将 RpcRequestCommand 中的 requestID 设置给响应
    private RpcRequestCommand   cmd;

    private RpcRequestProcessor processor;

    /** is response sent already */
    private AtomicBoolean       isResponseSentAlready = new AtomicBoolean();

    // 创造响应,发送消息(发送还是使用 RemotingContext)
    @Override
    public void sendResponse(Object responseObject) {
        if (isResponseSentAlready.compareAndSet(false, true)) {
            processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
        } else {
            throw new IllegalStateException("Should not send rpc response repeatedly!");
        }
    }
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
    UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
    if (processor instanceof AsyncUserProcessor) {
        // yi'bu'chu'li
        processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
    }
}

相关文章

网友评论

    本文标题:SOFABolt 源码分析16 - 上下文机制的设计

    本文链接:https://www.haomeiwen.com/subject/gbkqaftx.html