类组成
- Serializer 序列化器
- Serializer 定义了序列化接口,提供了一个默认实现 HessianSerializer,我们可以通过模仿 HessianSerializer 实现 Serializer 接口来提供自己的序列化方式
- SerializerManager 是 Serializer 实现类的管理器,通过一个 Serializer[] 存储各种序列化器,数组索引下标 index 就是 Serializer 的 key,例如 HessianSerializer 的 index = 1
- CustomSerializer 序列化器
- CustomSerializer 定义了自定义序列化接口,提供了一个适配器 DefaultCustomSerializer,使得用户可以近实现自己需要的方法,例如仅实现 request 的 serializeHeader 和 deserializeHeader
- CustomSerializerManager 是 CustomSerializer 实现类的管理器,通过一个 Map<String, CustomSerializer> 和 Map<CommandCode, CustomSerializer> 来存储各种自定义序列化器,前者 key 为请求数据的全类名,后者 key 为 CommandCode
- 各种命令
- RemotingCommand 提供了命令接口,定义了四个序列化方法与获取全局序列化方式的方法(继承自 Serializable,从 Codec 编解码的分析中可以得到,如果一个对象想被 Bolt 进行编码,必须实现 Serializable 接口)
- RpcCommand 指定了默认的全局序列化器(hessian2),并提供了三个 byte[] 数组,用于存储序列化后的相应内容,同时提供了 InvokeContext 对象,使得用户可以使用 InvokeContext 中的内容做一些逻辑;同时提供了 serialize() 、deserialize() 和 deserialize(long mask) 三种模板,前者做全序列化、中间做全反序列化、后者根据传入的 RpcDeserializeLevel 的值,决定做下列三者之一
- 仅序列化 clazzName
- 序列化 clazzName + header
- 全序列化 clazzName + header + content
- RpcRequestCommand 存储了真实的业务数据(clazzName、header、content),并提供了 customSerializer 对象(该对象通过 CustomSerializerManager 进行获取),并提供了三种类型业务数据的序列化和反序列化实现
- RpcResponseCommand 与 RpcRequestCommand 类似
- 调用入口
- 当发起请求时,例如 invokeSync() 时,RpcRemoting 会先对请求数据进行序列化,之后编码发送
- 当收到请求时,对请求消息进行解码,然后 RpcRequestProcessor 会对解码后的请求数据进行精细的反序列化;
- 处理请求完成之后,RpcRequestProcessor 会对响应消息进行序列化,之后编码发送
- 收到响应消息后,对响应消息进行解码,然后会在 RpcInvokeCallbackListener 或者 RpcResponseResolver 中对解码后的响应消息进行反序列化
三种业务数据序列化
- clazzName:因为只是 String 与 byte[] 互转,所以与 CustomSerializer 和 Serializer 无关
- header:仅仅用在 CustomSerializer 存在时
- content:当 CustomSerializer 存在时,首先使用 CustomSerializer 进行反序列化 content,如果序列化失败,再使用 Serializer 进行反序列化 content
一、Serializer 序列化器扩展
// 1. 实现 Serializer
public class MySerializer implements Serializer {
@Override
public byte[] serialize(Object obj) throws CodecException {
...
}
@Override
public <T> T deserialize(byte[] data, String classOfT) throws CodecException {
...
}
}
// 2. 注册
public static final byte mySerializer = 2;
SerializerManager.addSerializer(mySerializer, new MySerializer());
编写并注册号序列化器之后,现在可以有两种方式进行选择:
- 全局设置:
- 调用级别的设置
// 全局设置
System.setProperty(Configs.SERIALIZER, String.valueOf(MySerializer.mySerializer)); // 或者 -Dbolt.serializer=2
// 调用级别的设置
InvokeContext invokeContext = new InvokeContext();
invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, MySerializer.mySerializer);
二、CustomSerializer 序列化器扩展
使用姿势见 SOFABolt 源码分析10 - 精细的线程模型的设计 中的“2.4 设置 UserProcessor 自定义线程池选择器”。简单来讲就是两步:
- 继承 DefaultCustomSerializer 实现自定义序列化器 MyCustomHeaderSerializer
- 将 MyCustomHeaderSerializer 注册到 CustomSerializerManager 中
三、源码分析
3.1 Serializer 序列化器
========================== Serializer ==========================
public interface Serializer {
byte[] serialize(final Object obj) throws CodecException;
<T> T deserialize(final byte[] data, String classOfT) throws CodecException;
}
========================== SerializerManager ==========================
public class SerializerManager {
// 序列化器集合
private static Serializer[] serializers = new Serializer[5];
// 序列化器下标
public static final byte Hessian2 = 1;
//public static final byte Json = 2;
static {
addSerializer(Hessian2, new HessianSerializer());
}
public static Serializer getSerializer(int idx) {
return serializers[idx];
}
public static void addSerializer(int idx, Serializer serializer) {
if (serializers.length <= idx) {
// 扩容 - 创建新数组
Serializer[] newSerializers = new Serializer[idx + 5];
// 将老数组的内容拷贝到新数组
System.arraycopy(serializers, 0, newSerializers, 0, serializers.length);
serializers = newSerializers;
}
serializers[idx] = serializer;
}
}
3.2 CustomSerializer 序列化器
========================== CustomSerializer ==========================
public interface CustomSerializer {
<T extends RequestCommand> boolean serializeHeader(T request, InvokeContext invokeContext);
<T extends RequestCommand> boolean deserializeHeader(T request);
<T extends RequestCommand> boolean serializeContent(T request, InvokeContext invokeContext);
<T extends RequestCommand> boolean deserializeContent(T request);
<T extends ResponseCommand> boolean serializeHeader(T response);
<T extends ResponseCommand> boolean deserializeHeader(T response, InvokeContext invokeContext);
<T extends ResponseCommand> boolean serializeContent(T response);
<T extends ResponseCommand> boolean deserializeContent(T response, InvokeContext invokeContext);
}
========================== DefaultCustomSerializer ==========================
public class DefaultCustomSerializer implements CustomSerializer {
@Override
public <T extends RequestCommand> boolean serializeHeader(T request, InvokeContext invokeContext) {
return false;
}
@Override
public <T extends ResponseCommand> boolean serializeHeader(T response) {
return false;
}
@Override
public <T extends RequestCommand> boolean deserializeHeader(T request) {
return false;
}
@Override
public <T extends ResponseCommand> boolean deserializeHeader(T response, InvokeContext invokeContext) {
return false;
}
@Override
public <T extends RequestCommand> boolean serializeContent(T request, InvokeContext invokeContext) {
return false;
}
@Override
public <T extends ResponseCommand> boolean serializeContent(T response) {
return false;
}
@Override
public <T extends RequestCommand> boolean deserializeContent(T request) {
return false;
}
@Override
public <T extends ResponseCommand> boolean deserializeContent(T response, InvokeContext invokeContext) {
return false;
}
}
========================== CustomSerializerManager ==========================
public class CustomSerializerManager {
/** For rpc,key = clazzName */
private static ConcurrentHashMap<String, CustomSerializer> classCustomSerializer = new ConcurrentHashMap<String, CustomSerializer>();
/** For user defined command,key = CommandCode */
private static ConcurrentHashMap<CommandCode, CustomSerializer> commandCustomSerializer = new ConcurrentHashMap<CommandCode, CustomSerializer>();
public static void registerCustomSerializer(String className, CustomSerializer serializer) {
CustomSerializer prevSerializer = classCustomSerializer.putIfAbsent(className, serializer);
// 只能注册一次,不可修改
if (prevSerializer != null) {
throw new RuntimeException();
}
}
public static CustomSerializer getCustomSerializer(String className) {
if (!classCustomSerializer.isEmpty()) {
return classCustomSerializer.get(className);
}
return null;
}
public static void registerCustomSerializer(CommandCode code, CustomSerializer serializer) {
CustomSerializer prevSerializer = commandCustomSerializer.putIfAbsent(code, serializer);
if (prevSerializer != null) {
throw new RuntimeException();
}
}
public static CustomSerializer getCustomSerializer(CommandCode code) {
if (!commandCustomSerializer.isEmpty()) {
return commandCustomSerializer.get(code);
}
return null;
}
public static void clear() {
classCustomSerializer.clear();
commandCustomSerializer.clear();
}
}
- DefaultCustomSerializer 是 CustomSerializer 一个适配器,所有的方法都返回 false,使得用户可以仅实现自己需要的方法,例如仅实现 request 的 serializeHeader 和 deserializeHeader
- Map<String, CustomSerializer> 通常用在 rpc 中;Map<CommandCode, CustomSerializer> 通常用在用户自定义 CommandCode 时。
3.3 各种命令
========================== RemotingCommand ==========================
public interface RemotingCommand extends Serializable {
...
// 可用于 CustomSerializer
InvokeContext getInvokeContext();
// 获取序列化器的key(即数组下标)
byte getSerializer();
// Serialize all parts of remoting command
void serialize();
// Deserialize all parts of remoting command
void deserialize();
// Serialize content of remoting command
void serializeContent(InvokeContext invokeContext);
// Deserialize content of remoting command
void deserializeContent(InvokeContext invokeContext);
}
========================== RpcCommand ==========================
public abstract class RpcCommand implements RemotingCommand {
// 请求 / 响应 / 心跳
private CommandCode cmdCode;
// 序列化器:默认为 hessian2
private byte serializer = ConfigManager.serializer;
/** The length of clazz */
private short clazzLength = 0;
private short headerLength = 0;
private int contentLength = 0;
// clazzName: String <-> byte[]
private byte[] clazz;
// header: Object <-> byte[]
private byte[] header;
// content: Object <-> byte[]
private byte[] content;
// 调用上下文:其内的属性可能会用于自定义序列化器,但是 invokeContext 本身不会传到对端
private InvokeContext invokeContext;
// 全部序列化
@Override
public void serialize() throws SerializationException {
this.serializeClazz();
this.serializeHeader(this.invokeContext);
this.serializeContent(this.invokeContext);
}
// 全部反序列化
@Override
public void deserialize() throws DeserializationException {
this.deserializeClazz();
this.deserializeHeader(this.invokeContext);
this.deserializeContent(this.invokeContext);
}
/**
* Deserialize according to mask.
* <ol>
* <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_CLAZZ}, only deserialize clazz - only one part.</li>
* <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_HEADER}, deserialize clazz and header - two parts.</li>
* <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_ALL}, deserialize clazz, header and content - all three parts.</li>
* </ol>
*/
public void deserialize(long mask) throws DeserializationException {
if (mask <= RpcDeserializeLevel.DESERIALIZE_CLAZZ) {
// 仅反序列化 clazzName
this.deserializeClazz();
} else if (mask <= RpcDeserializeLevel.DESERIALIZE_HEADER) {
// 仅反序列化 clazzName + header
this.deserializeClazz();
this.deserializeHeader(this.getInvokeContext());
} else if (mask <= RpcDeserializeLevel.DESERIALIZE_ALL) {
// 反序列化 clazzName + header + content
this.deserialize();
}
}
// Serialize content class.
public void serializeClazz() {
}
// Deserialize the content class.
public void deserializeClazz() {
}
// Serialize the header.
public void serializeHeader(InvokeContext invokeContext) {
}
// Deserialize the header.
public void deserializeHeader(InvokeContext invokeContext) {
}
// Serialize the content.
@Override
public void serializeContent(InvokeContext invokeContext) {
}
// Deserialize the content.
@Override
public void deserializeContent(InvokeContext invokeContext) {
}
@Override
public byte getSerializer() {
return serializer;
}
public void setSerializer(byte serializer) {
this.serializer = serializer;
}
}
========================== RpcRequestCommand ==========================
public class RpcRequestCommand extends RequestCommand {
// 请求对象类型
private String requestClass;
// 请求头
private Object requestHeader;
// 真正的请求对象
private Object requestObject;
// 自定义序列化器
private CustomSerializer customSerializer;
@Override
public void serializeClazz() {
if (this.requestClass != null) {
// 直接 String -> byte[]
byte[] clz = this.requestClass.getBytes(Configs.DEFAULT_CHARSET);
this.setClazz(clz);
}
}
@Override
public void deserializeClazz() {
// this.getRequestClass() != null 表示已经反序列化过了 - 避免重复反序列化
if (this.getClazz() != null && this.getRequestClass() == null) {
// 直接 byte[] -> String
this.setRequestClass(new String(this.getClazz(), Configs.DEFAULT_CHARSET));
}
}
// 如果 customSerializer 存在,才会做 serializeHeader
@Override
public void serializeHeader(InvokeContext invokeContext) {
// 如果 customSerializer 存在,才会做 serializeHeader
if (this.getCustomSerializer() != null) {
// 执行 customSerializer 自行实现的逻辑(此处就可以使用 invokeContext 做一些逻辑了)
this.getCustomSerializer().serializeHeader(this, invokeContext);
}
}
@Override
public void deserializeHeader(InvokeContext invokeContext) {
// this.getRequestHeader() != null 表示已经反序列化过了 - 避免重复反序列化
if (this.getHeader() != null && this.getRequestHeader() == null) {
if (this.getCustomSerializer() != null) {
// 执行 customSerializer 自行实现的逻辑
this.getCustomSerializer().deserializeHeader(this);
}
}
}
@Override
public void serializeContent(InvokeContext invokeContext) {
if (this.requestObject != null) {
// 如果 customSerializer 存在,使用 customSerializer 自行实现的逻辑做 content 的序列化,如果失败,使用 Serializer 的序列化器
if (this.getCustomSerializer() != null && this.getCustomSerializer().serializeContent(this, invokeContext)) {
return;
}
// 如果 customSerializer 不存在或序列化失败,使用 RpcCommand.serializer 属性获取指定的序列化器,然后通过该序列化器做序列化操作
// RpcCommand.serializer 默认为1,即 hessian2 序列化器,可以通过 -Dbolt.serializer=1 来指定(前提是需要将指定的序列化器注册到 SerializerManager 中)- 这种是全局的;
// 可以通过 invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, 序列化器序号来指定) - 这种可以为每一次的调用指定动态序列化器
this.setContent(SerializerManager.getSerializer(this.getSerializer()).serialize(this.requestObject));
}
}
@Override
public void deserializeContent(InvokeContext invokeContext) throws DeserializationException {
if (this.getRequestObject() == null) {
// 如果 customSerializer 存在,使用 customSerializer 自行实现的逻辑做反序列化
if (this.getCustomSerializer() != null && this.getCustomSerializer().deserializeContent(this)) {
return;
}
// 如果 customSerializer 不存在或反序列化失败,使用 RpcCommand.serializer 属性获取指定的序列化器,然后通过该序列化器做反序列化操作
if (this.getContent() != null) {
this.setRequestObject(SerializerManager.getSerializer(this.getSerializer()).deserialize(this.getContent(), this.requestClass));
}
}
}
// 获取自定义序列化器
public CustomSerializer getCustomSerializer() {
// 如果有了,直接返回
if (this.customSerializer != null) {
return customSerializer;
}
// 先根据请求数据的全类名获取 CustomSerializer;如果获取不到,再根据 CommandCode 获取
if (this.requestClass != null) {
this.customSerializer = CustomSerializerManager.getCustomSerializer(this.requestClass);
}
if (this.customSerializer == null) {
this.customSerializer = CustomSerializerManager.getCustomSerializer(this.getCmdCode());
}
return this.customSerializer;
}
}
========================== RpcResponseCommand ==========================
public class RpcResponseCommand extends ResponseCommand {
// 响应对象类型
private String responseClass;
// 响应头
private Object responseHeader;
// 真正的响应对象
private Object responseObject;
// 自定义序列化器
private CustomSerializer customSerializer;
@Override
public void serializeClazz() {
if (this.getResponseClass() != null) {
// 直接 String -> byte[]
byte[] clz = this.getResponseClass().getBytes(Configs.DEFAULT_CHARSET);
this.setClazz(clz);
}
}
@Override
public void deserializeClazz() {
// this.getResponseClass() != null 表示已经反序列化过了 - 避免重复反序列化
if (this.getClazz() != null && this.getResponseClass() == null) {
this.setResponseClass(new String(this.getClazz(), Configs.DEFAULT_CHARSET));
}
}
// 如果 customSerializer 存在,才会做 serializeHeader
@Override
public void serializeHeader(InvokeContext invokeContext) {
if (this.getCustomSerializer() != null) {
// 如果 customSerializer 存在,才会做 serializeHeader
this.getCustomSerializer().serializeHeader(this);
}
}
@Override
public void deserializeHeader(InvokeContext invokeContext) {
// this.getResponseHeader() != null 表示已经反序列化过了 - 避免重复反序列化
if (this.getHeader() != null && this.getResponseHeader() == null) {
if (this.getCustomSerializer() != null) {
this.getCustomSerializer().deserializeHeader(this, invokeContext);
}
}
}
@Override
public void serializeContent(InvokeContext invokeContext) throws SerializationException {
if (this.getResponseObject() != null) {
if (this.getCustomSerializer() != null && this.getCustomSerializer().serializeContent(this)) {
return;
}
this.setContent(SerializerManager.getSerializer(this.getSerializer()).serialize(this.responseObject));
}
}
@Override
public void deserializeContent(InvokeContext invokeContext) throws DeserializationException {
if (this.getResponseObject() == null) {
if (this.getCustomSerializer() != null && this.getCustomSerializer().deserializeContent(this, invokeContext)) {
return;
}
if (this.getContent() != null) {
this.setResponseObject(SerializerManager.getSerializer(this.getSerializer()).deserialize(this.getContent(), this.responseClass));
}
}
}
public CustomSerializer getCustomSerializer() {
if (this.customSerializer != null) {
return customSerializer;
}
if (this.responseClass != null) {
this.customSerializer = CustomSerializerManager.getCustomSerializer(this.responseClass);
}
if (this.customSerializer == null) {
this.customSerializer = CustomSerializerManager.getCustomSerializer(this.getCmdCode());
}
return this.customSerializer;
}
}
3.4 调用入口
========================== RpcRemoting ==========================
public abstract class RpcRemoting extends BaseRemoting {
public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 创建请求命令(序列化)
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
...
ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,
timeoutMillis);
responseCommand.setInvokeContext(invokeContext);
// 解析响应(反序列化)
Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
return responseObject;
}
protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
// 创建 RpcRequestCommand
RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);
if (null != invokeContext) {
// 设置调用级别的 Serializer
Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
if (null != clientCustomSerializer) {
command.setSerializer((Byte) clientCustomSerializer);
}
}
command.setRequestClass(request.getClass().getName());
command.setInvokeContext(invokeContext);
// 请求的序列化
command.serialize();
return command;
}
}
========================== RpcRequestProcessor ==========================
public class RpcRequestProcessor extends AbstractRemotingProcessor<RpcRequestCommand> {
@Override
public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
// 首先反序列化 clazzName,因为需要 clazzName 来获取 UserProcessor,如果处理 clazzName 的 UserProcessor 不存在,则直接返回错误
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
return;
}
// 根据clazz获取UserProcessor
UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
...
// 如果指定在IO线程处理请求,则直接反序列化全部,创建ProcessTask,直接执行
if (userProcessor.processInIOThread()) {
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
return;
}
// process in io thread
new ProcessTask(ctx, cmd).run();
return;// end
}
// 如果指定不是在IO线程处理请求,则先获取线程池,创建ProcessTask,在新的线程池执行
Executor executor;
// 看是否配置了 UserProcessor.executorSelector,即线程池选择器,
// 如果配置了:则需要反序列化出 header,因为 executorSelector 需要根据 header 去选择 executor;content 在异步线程池进行反序列化
// 如果没有配置:则 header 和 content 都在选出的异步线程池进行反序列化
if (null == userProcessor.getExecutorSelector()) {
executor = userProcessor.getExecutor();
} else {
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_HEADER)) {
return;
}
executor = userProcessor.getExecutorSelector().select(cmd.getRequestClass(),
cmd.getRequestHeader());
}
...
executor.execute(new ProcessTask(ctx, cmd));
}
@Override
public void doProcess(RemotingContext ctx, RpcRequestCommand cmd) {
...
// 反序列化全部
if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
return;
}
dispatchToUserProcessor(ctx, cmd);
}
public void sendResponseIfNecessary(RemotingContext ctx, byte type, RemotingCommand response) {
final int id = response.getId();
if (type != RpcCommandType.REQUEST_ONEWAY) {
RemotingCommand serializedResponse = response;
// 响应序列化
response.serialize();
// Netty 发送响应
ctx.writeAndFlush(serializedResponse);
}
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
... 消息处理
// 发送响应(创建 RpcResponseCommand,指定序列化器为请求的序列化器)
sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
}
private boolean deserializeRequestCommand(RemotingContext ctx, RpcRequestCommand cmd, int level) {
// 根据传入的序列化级别来序列化不同的内容
cmd.deserialize(level);
}
class ProcessTask implements Runnable {
@Override
public void run() {
RpcRequestProcessor.this.doProcess(ctx, msg);
}
}
}
public class RpcCommandFactory implements CommandFactory {
@Override
public RpcRequestCommand createRequestCommand(Object requestObject) {
return new RpcRequestCommand(requestObject);
}
@Override
public RpcResponseCommand createResponse(Object responseObject, RemotingCommand requestCmd) {
RpcResponseCommand response = new RpcResponseCommand(requestCmd.getId(), responseObject);
if (null != responseObject) {
response.setResponseClass(responseObject.getClass().getName());
} else {
response.setResponseClass(null);
}
// 响应与请求用的是同一种序列化器
response.setSerializer(requestCmd.getSerializer());
...
return response;
}
}
========================== RpcInvokeCallbackListener ==========================
public class RpcInvokeCallbackListener implements InvokeCallbackListener {
@Override
public void onResponse(InvokeFuture future) {
InvokeCallback callback = future.getInvokeCallback();
if (callback != null) {
CallbackTask task = new CallbackTask(this.getRemoteAddress(), future);
if (callback.getExecutor() != null) {
callback.getExecutor().execute(task);
} else {
task.run();
}
}
}
class CallbackTask implements Runnable {
@Override
public void run() {
InvokeCallback callback = future.getInvokeCallback();
// 设置响应
ResponseCommand response = (ResponseCommand) future.waitResponse(0);
response.setInvokeContext(future.getInvokeContext());
RpcResponseCommand rpcResponse = (RpcResponseCommand) response;
// 反序列化
response.deserialize();
// 回调
callback.onResponse(rpcResponse.getResponseObject());
} // end of run
}
}
========================== RpcResponseResolver ==========================
public class RpcResponseResolver {
public static Object resolveResponseObject(ResponseCommand responseCommand, String addr) {
return toResponseObject(responseCommand);
}
private static Object toResponseObject(ResponseCommand responseCommand) {
RpcResponseCommand response = (RpcResponseCommand) responseCommand;
// 响应的反序列化
response.deserialize();
return response.getResponseObject();
}
}
网友评论