SOFABolt 中存在四种上下文 context
- InvokeContext:调用上下文,用于端内隐式传参,并可以通过自定义序列化器将 InvokeContext 内存储的参数自定义的序列化传递给对端(注意:InvokeContext 本身是不会传递给对端的)
- RemotingContext:Remoting 层的上下文,程序内部使用
- BizContext:业务上下文,提供给用户程序使用,封装了 RemotingContext,防止直接将 RemotingContext 暴露给用户
- AsyncContext:存储存根信息,用于 AsyncUserProcessor 异步返回响应
一、InvokeContext
1.1 使用姿势
见 SOFABolt 源码分析10 - 精细的线程模型的设计 的 “2.4 设置 UserProcessor 自定义线程池选择器”
1.2 源码分析
public class InvokeContext {
// ~~~ invoke context keys of client side
public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip";
public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port";
public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip";
public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port";
/** time consumed during connection creating, this is a timespan */
public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";
// ~~~ invoke context keys of server side
public final static String SERVER_LOCAL_IP = "bolt.server.local.ip";
public final static String SERVER_LOCAL_PORT = "bolt.server.local.port";
public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip";
public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port";
// ~~~ invoke context keys of client and server side
public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
/** 时间段:请求达到解码器 ~ 请求即将被处理(与处理完成) */
public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch";
public final static int INITIAL_SIZE = 8;
/** context */
private ConcurrentHashMap<String, Object> context;
public InvokeContext() {
this.context = new ConcurrentHashMap<String, Object>(INITIAL_SIZE);
}
}
注意:
- InvokeContext 内部实际上就是一个 map,而不是像 RpcInvokeContext 一样,内部是一个 ThreadLocal(RpcInvokeContext 是 SOFARPC 的上下文)
- 由于 InvokeContext 内部只是一个 map,所以 InvokeContext 本身不能进行“隐式传递”,InvokeContext 本身需要作为接口的参数进行传递才行,所以四种调用模式的三种调用链路方式都提供了带有 InvokeContext 参数的方法,eg. invokeSync(Connection conn, Object request,
InvokeContext invokeContext
, int timeoutMillis)- InvokeContext 不会传递给对端,但是其中的内容可以通过自定义序列化器的方式传递给对端,使用姿势见 SOFABolt 源码分析10 - 精细的线程模型的设计
设置建连消耗时间(仅客户端,服务端不建连)
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
this.connectionManager.check(conn);
return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
// 记录开始时间
long start = System.currentTimeMillis();
Connection conn;
try {
// 建连
conn = this.connectionManager.getAndCreateIfAbsent(url);
} finally {
if (null != invokeContext) {
// 记录建连时间(客户端,服务端不建连)
invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
}
}
return conn;
}
设置客户端自定义序列化器 + crc 开关 + 四要素(ip/port)+ requestID
public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 创建请求:会根据 invokeContext 配置 - 设置客户端自定义序列化器 + crc 开关
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
// 预处理 InvokeContext:设置四要素(IP/PORT) + 请求ID
preProcessInvokeContext(invokeContext, requestCommand, conn);
// 发起请求
ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
// 将 invokeContext 设置到 responseCommand,可以让用户使用
responseCommand.setInvokeContext(invokeContext);
Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand,
RemotingUtil.parseRemoteAddress(conn.getChannel()));
return responseObject;
}
protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);
if (null != invokeContext) {
// 设置客户端自定义序列化器
Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
if (null != clientCustomSerializer) {
command.setSerializer((Byte) clientCustomSerializer);
}
// 是否开启 crc 开关
// enable crc by default, user can disable by set invoke context `false` for key `InvokeContext.BOLT_CRC_SWITCH`
Boolean crcSwitch = invokeContext.get(InvokeContext.BOLT_CRC_SWITCH, ProtocolSwitch.CRC_SWITCH_DEFAULT_VALUE);
if (null != crcSwitch && crcSwitch) {
command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
}
} else {
// enable crc by default, if there is no invoke context.
command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
}
command.setTimeout(timeoutMillis);
command.setRequestClass(request.getClass().getName());
// 设置 invokeContext 到 RpcRequestCommand 中,后续在自定义序列化器的序列化 header 和 body 的过程中,
// 可以自定义的从 invokeContext 中序列化信息到对端
command.setInvokeContext(invokeContext);
command.serialize();
logDebugInfo(command);
return command;
}
============================= RpcClientRemoting =============================
protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
if (null != invokeContext) {
// 设置四要素
invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
// 设置 requestID
invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
}
}
============================= RpcServerRemoting =============================
protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
if (null != invokeContext) {
// 设置四要素
invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
// 设置 requestID
invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
}
}
关于序列化的东西,在《序列化设计》部分分析。
统计“消息到达解码器 ~ 消息即将被业务逻辑处理器处理” 之间的时间
============================= RpcRequestProcessor =============================
public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
long currentTimestamp = System.currentTimeMillis();
// 预处理 RemotingContext
preProcessRemotingContext(ctx, cmd, currentTimestamp);
...
dispatchToUserProcessor(ctx, cmd);
}
private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd,
long currentTimestamp) {
...
ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME, currentTimestamp - cmd.getArriveTime());
}
BizContext 获取 InvokeContext
public class DefaultBizContext implements BizContext {
private RemotingContext remotingCtx;
public InvokeContext getInvokeContext() {
return this.remotingCtx.getInvokeContext();
}
}
二、RemotingContext
/**
* Wrap the ChannelHandlerContext.
*/
public class RemotingContext {
// netty ChannelHandlerContext
private ChannelHandlerContext channelContext;
// 是否是服务端
private boolean serverSide = false;
/** whether need handle request timeout, if true, request will be discarded. The default value is true */
private boolean timeoutDiscard = true;
/** request arrive time stamp */
private long arriveTimestamp;
/** request timeout setting by invoke side */
private int timeout;
/** rpc command type:REQUEST / RESPONSE / REQUEST_ONEWAY */
private int rpcCommandType;
// 用户业务逻辑处理器
private ConcurrentHashMap<String, UserProcessor<?>> userProcessors;
// 调用上下文,主要会统计“消息到达解码器 ~ 消息即将被业务逻辑处理器处理” 之间的时间
private InvokeContext invokeContext;
public ChannelFuture writeAndFlush(RemotingCommand msg) {
return this.channelContext.writeAndFlush(msg);
}
// whether this request already timeout: oneway 没有请求超时的概念
public boolean isRequestTimeout() {
if (this.timeout > 0 && (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
&& (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
return true;
}
return false;
}
public UserProcessor<?> getUserProcessor(String className) {
return StringUtils.isBlank(className) ? null : this.userProcessors.get(className);
}
public Connection getConnection() {
return ConnectionUtil.getConnectionFromChannel(channelContext.channel());
}
}
================== ConnectionUtil ==================
public class ConnectionUtil {
public static Connection getConnectionFromChannel(Channel channel) {
// 从 channel 的附属属性中获取 Connection
Attribute<Connection> connAttr = channel.attr(Connection.CONNECTION);
Connection connection = connAttr.get();
return connection;
}
}
RemotingContext 作用:
- 包含 userProcessors 映射:用于在处理消息流程中选择业务逻辑处理
- 包装 ChannelHandlerContext:用于在处理消息结束后或者异常后向对端发送消息
- 包装 InvokeContext:用于存放添加服务端链路调用上下文
注意:BizContext 会包含 RemotingContext,但是不会提供 public 的 getRemotingContext 方法,但是会提供 getInvokeContext 方法。
使用链路
================== RpcHandler ==================
public void channelRead(ChannelHandlerContext ctx, Object msg) {
...
protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
}
================== RpcRequestProcessor ==================
public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
// 从 RemotingContext 获取 UserProcessor
UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
// set timeout check state from user's processor
ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());
// use the final executor dispatch process task
executor.execute(new ProcessTask(ctx, cmd));
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
// 从 RemotingContext 获取 UserProcessor
UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
if (processor instanceof AsyncUserProcessor) {
// processor.preHandleRequest:使用 BizContext 包装 RemotingContext,避免 RemotingContext 直接暴露给用户(因为 RemotingContext 包含 ChannelHandlerContext,可直接发送消息给对端)
// 创建 RpcAsyncContext 存根:包装 RemotingContext,内部使用其做一步发送操作
processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
} else {
// processor.preHandleRequest:使用 BizContext 包装 RemotingContext
Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
// 使用 ctx.writeAndFlush(serializedResponse) 发送响应
sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
}
}
三、BizContext
使用姿势见 SOFABolt 源码分析15 - 双工通信机制的设计 中的“1.1、基于 addr 链路模式”
public class DefaultBizContext implements BizContext {
// 包裹 RemotingContext
private RemotingContext remotingCtx;
// protect 方式,只有其子类可以访问
protected RemotingContext getRemotingCtx() {
return this.remotingCtx;
}
@Override
public String getRemoteAddress() {
if (null != this.remotingCtx) {
ChannelHandlerContext channelCtx = this.remotingCtx.getChannelContext();
Channel channel = channelCtx.channel();
if (null != channel) {
return RemotingUtil.parseRemoteAddress(channel);
}
}
return "UNKNOWN_ADDRESS";
}
...
// 这里也存储了 Connection,可以用于服务端向客户端直接发起调用
@Override
public Connection getConnection() {
if (null != this.remotingCtx) {
return this.remotingCtx.getConnection();
}
return null;
}
@Override
public boolean isRequestTimeout() {
return this.remotingCtx.isRequestTimeout();
}
...
@Override
public InvokeContext getInvokeContext() {
return this.remotingCtx.getInvokeContext();
}
}
BizContext 是直接给用户程序使用的,而 RemotingContext 是程序内部使用的
public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
return new DefaultBizContext(remotingCtx);
}
四、AsyncContext
public class RpcAsyncContext implements AsyncContext {
/** remoting context */
private RemotingContext ctx;
// rpc request command:
// 1. 会根据请求中的 type 是否是 oneway 来决定是否向对端发送数据
// 2. 会将 RpcRequestCommand 中的 requestID 设置给响应
private RpcRequestCommand cmd;
private RpcRequestProcessor processor;
/** is response sent already */
private AtomicBoolean isResponseSentAlready = new AtomicBoolean();
// 创造响应,发送消息(发送还是使用 RemotingContext)
@Override
public void sendResponse(Object responseObject) {
if (isResponseSentAlready.compareAndSet(false, true)) {
processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
} else {
throw new IllegalStateException("Should not send rpc response repeatedly!");
}
}
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
if (processor instanceof AsyncUserProcessor) {
// yi'bu'chu'li
processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
}
}
网友评论