美文网首页
SOFABolt 源码分析20 - Serializer 序列化

SOFABolt 源码分析20 - Serializer 序列化

作者: 原水寒 | 来源:发表于2018-10-20 23:35 被阅读85次
    image.png

    类组成

    • Serializer 序列化器
    • Serializer 定义了序列化接口,提供了一个默认实现 HessianSerializer,我们可以通过模仿 HessianSerializer 实现 Serializer 接口来提供自己的序列化方式
    • SerializerManager 是 Serializer 实现类的管理器,通过一个 Serializer[] 存储各种序列化器,数组索引下标 index 就是 Serializer 的 key,例如 HessianSerializer 的 index = 1
    • CustomSerializer 序列化器
    • CustomSerializer 定义了自定义序列化接口,提供了一个适配器 DefaultCustomSerializer,使得用户可以近实现自己需要的方法,例如仅实现 request 的 serializeHeader 和 deserializeHeader
    • CustomSerializerManager 是 CustomSerializer 实现类的管理器,通过一个 Map<String, CustomSerializer> 和 Map<CommandCode, CustomSerializer> 来存储各种自定义序列化器,前者 key 为请求数据的全类名,后者 key 为 CommandCode
    • 各种命令
    • RemotingCommand 提供了命令接口,定义了四个序列化方法与获取全局序列化方式的方法(继承自 Serializable,从 Codec 编解码的分析中可以得到,如果一个对象想被 Bolt 进行编码,必须实现 Serializable 接口)
    • RpcCommand 指定了默认的全局序列化器(hessian2),并提供了三个 byte[] 数组,用于存储序列化后的相应内容,同时提供了 InvokeContext 对象,使得用户可以使用 InvokeContext 中的内容做一些逻辑;同时提供了 serialize() 、deserialize() 和 deserialize(long mask) 三种模板,前者做全序列化、中间做全反序列化、后者根据传入的 RpcDeserializeLevel 的值,决定做下列三者之一
    • 仅序列化 clazzName
    • 序列化 clazzName + header
    • 全序列化 clazzName + header + content
    • RpcRequestCommand 存储了真实的业务数据(clazzName、header、content),并提供了 customSerializer 对象(该对象通过 CustomSerializerManager 进行获取),并提供了三种类型业务数据的序列化和反序列化实现
    • RpcResponseCommand 与 RpcRequestCommand 类似
    • 调用入口
    • 当发起请求时,例如 invokeSync() 时,RpcRemoting 会先对请求数据进行序列化,之后编码发送
    • 当收到请求时,对请求消息进行解码,然后 RpcRequestProcessor 会对解码后的请求数据进行精细的反序列化;
    • 处理请求完成之后,RpcRequestProcessor 会对响应消息进行序列化,之后编码发送
    • 收到响应消息后,对响应消息进行解码,然后会在 RpcInvokeCallbackListener 或者 RpcResponseResolver 中对解码后的响应消息进行反序列化

    三种业务数据序列化

    • clazzName:因为只是 String 与 byte[] 互转,所以与 CustomSerializer 和 Serializer 无关
    • header:仅仅用在 CustomSerializer 存在时
    • content:当 CustomSerializer 存在时,首先使用 CustomSerializer 进行反序列化 content,如果序列化失败,再使用 Serializer 进行反序列化 content

    一、Serializer 序列化器扩展

    // 1. 实现 Serializer
    public class MySerializer implements Serializer {
         @Override
         public byte[] serialize(Object obj) throws CodecException {
             ...
         }
    
         @Override
         public <T> T deserialize(byte[] data, String classOfT) throws CodecException {
             ...
         }
    }
    // 2. 注册
    public static final byte mySerializer = 2;
    SerializerManager.addSerializer(mySerializer, new MySerializer());
    

    编写并注册号序列化器之后,现在可以有两种方式进行选择:

    • 全局设置:
    • 调用级别的设置
    // 全局设置
    System.setProperty(Configs.SERIALIZER, String.valueOf(MySerializer.mySerializer)); // 或者 -Dbolt.serializer=2
    
    // 调用级别的设置
    InvokeContext invokeContext = new InvokeContext();
    invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, MySerializer.mySerializer);
    

    二、CustomSerializer 序列化器扩展

    使用姿势见 SOFABolt 源码分析10 - 精细的线程模型的设计 中的“2.4 设置 UserProcessor 自定义线程池选择器”。简单来讲就是两步:

    1. 继承 DefaultCustomSerializer 实现自定义序列化器 MyCustomHeaderSerializer
    2. 将 MyCustomHeaderSerializer 注册到 CustomSerializerManager 中

    三、源码分析

    3.1 Serializer 序列化器

    ========================== Serializer ==========================
    public interface Serializer {
        byte[] serialize(final Object obj) throws CodecException;
        <T> T deserialize(final byte[] data, String classOfT) throws CodecException;
    }
    
    ========================== SerializerManager ==========================
    public class SerializerManager {
        // 序列化器集合
        private static Serializer[] serializers = new Serializer[5];
        // 序列化器下标
        public static final byte    Hessian2    = 1;
        //public static final byte    Json        = 2;
    
        static {
            addSerializer(Hessian2, new HessianSerializer());
        }
    
        public static Serializer getSerializer(int idx) {
            return serializers[idx];
        }
    
        public static void addSerializer(int idx, Serializer serializer) {
            if (serializers.length <= idx) {
                // 扩容 - 创建新数组
                Serializer[] newSerializers = new Serializer[idx + 5];
                // 将老数组的内容拷贝到新数组
                System.arraycopy(serializers, 0, newSerializers, 0, serializers.length);
                serializers = newSerializers;
            }
            serializers[idx] = serializer;
        }
    }
    

    3.2 CustomSerializer 序列化器

    ========================== CustomSerializer ==========================
    public interface CustomSerializer {
        <T extends RequestCommand> boolean serializeHeader(T request, InvokeContext invokeContext);
        <T extends RequestCommand> boolean deserializeHeader(T request);
        <T extends RequestCommand> boolean serializeContent(T request, InvokeContext invokeContext);
        <T extends RequestCommand> boolean deserializeContent(T request);
    
        <T extends ResponseCommand> boolean serializeHeader(T response);
        <T extends ResponseCommand> boolean deserializeHeader(T response, InvokeContext invokeContext);
        <T extends ResponseCommand> boolean serializeContent(T response);
        <T extends ResponseCommand> boolean deserializeContent(T response, InvokeContext invokeContext);
    }
    
    ========================== DefaultCustomSerializer ==========================
    public class DefaultCustomSerializer implements CustomSerializer {
        @Override
        public <T extends RequestCommand> boolean serializeHeader(T request, InvokeContext invokeContext) {
            return false;
        }
    
        @Override
        public <T extends ResponseCommand> boolean serializeHeader(T response) {
            return false;
        }
    
        @Override
        public <T extends RequestCommand> boolean deserializeHeader(T request) {
            return false;
        }
    
        @Override
        public <T extends ResponseCommand> boolean deserializeHeader(T response, InvokeContext invokeContext) {
            return false;
        }
    
        @Override
        public <T extends RequestCommand> boolean serializeContent(T request, InvokeContext invokeContext) {
            return false;
        }
    
        @Override
        public <T extends ResponseCommand> boolean serializeContent(T response) {
            return false;
        }
    
        @Override
        public <T extends RequestCommand> boolean deserializeContent(T request) {
            return false;
        }
    
        @Override
        public <T extends ResponseCommand> boolean deserializeContent(T response, InvokeContext invokeContext) {
            return false;
        }
    }
    
    ========================== CustomSerializerManager ==========================
    public class CustomSerializerManager {
        /** For rpc,key = clazzName */
        private static ConcurrentHashMap<String, CustomSerializer>        classCustomSerializer   = new ConcurrentHashMap<String, CustomSerializer>();
        /** For user defined command,key = CommandCode */
        private static ConcurrentHashMap<CommandCode, CustomSerializer> commandCustomSerializer = new ConcurrentHashMap<CommandCode, CustomSerializer>();
    
        public static void registerCustomSerializer(String className, CustomSerializer serializer) {
            CustomSerializer prevSerializer = classCustomSerializer.putIfAbsent(className, serializer);
            // 只能注册一次,不可修改
            if (prevSerializer != null) {
                throw new RuntimeException();
            }
        }
    
        public static CustomSerializer getCustomSerializer(String className) {
            if (!classCustomSerializer.isEmpty()) {
                return classCustomSerializer.get(className);
            }
            return null;
        }
    
        public static void registerCustomSerializer(CommandCode code, CustomSerializer serializer) {
            CustomSerializer prevSerializer = commandCustomSerializer.putIfAbsent(code, serializer);
            if (prevSerializer != null) {
                throw new RuntimeException();
            }
        }
    
        public static CustomSerializer getCustomSerializer(CommandCode code) {
            if (!commandCustomSerializer.isEmpty()) {
                return commandCustomSerializer.get(code);
            }
            return null;
        }
    
        public static void clear() {
            classCustomSerializer.clear();
            commandCustomSerializer.clear();
        }
    }
    
    • DefaultCustomSerializer 是 CustomSerializer 一个适配器,所有的方法都返回 false,使得用户可以仅实现自己需要的方法,例如仅实现 request 的 serializeHeader 和 deserializeHeader
    • Map<String, CustomSerializer> 通常用在 rpc 中;Map<CommandCode, CustomSerializer> 通常用在用户自定义 CommandCode 时。

    3.3 各种命令

    ========================== RemotingCommand ==========================
    public interface RemotingCommand extends Serializable {
        ...
        // 可用于 CustomSerializer
        InvokeContext getInvokeContext();
    
        // 获取序列化器的key(即数组下标)
        byte getSerializer();
    
        // Serialize all parts of remoting command
        void serialize();
    
        // Deserialize all parts of remoting command
        void deserialize();
    
        // Serialize content of remoting command
        void serializeContent(InvokeContext invokeContext);
    
        // Deserialize content of remoting command
        void deserializeContent(InvokeContext invokeContext);
    }
    
    ========================== RpcCommand ==========================
    public abstract class RpcCommand implements RemotingCommand {
        // 请求 / 响应 / 心跳
        private CommandCode       cmdCode;
        // 序列化器:默认为 hessian2
        private byte              serializer       = ConfigManager.serializer;
        /** The length of clazz */
        private short             clazzLength      = 0;
        private short             headerLength     = 0;
        private int               contentLength    = 0;
        // clazzName: String <-> byte[]
        private byte[]            clazz;
        // header: Object <-> byte[]
        private byte[]            header;
        // content: Object <-> byte[]
        private byte[]            content;
        // 调用上下文:其内的属性可能会用于自定义序列化器,但是 invokeContext 本身不会传到对端
        private InvokeContext     invokeContext;
    
        // 全部序列化
        @Override
        public void serialize() throws SerializationException {
            this.serializeClazz();
            this.serializeHeader(this.invokeContext);
            this.serializeContent(this.invokeContext);
        }
    
        // 全部反序列化
        @Override
        public void deserialize() throws DeserializationException {
            this.deserializeClazz();
            this.deserializeHeader(this.invokeContext);
            this.deserializeContent(this.invokeContext);
        }
    
        /**
         * Deserialize according to mask.
         * <ol>
         *     <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_CLAZZ}, only deserialize clazz - only one part.</li>
         *     <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_HEADER}, deserialize clazz and header - two parts.</li>
         *     <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_ALL}, deserialize clazz, header and content - all three parts.</li>
         * </ol>
         */
        public void deserialize(long mask) throws DeserializationException {
            if (mask <= RpcDeserializeLevel.DESERIALIZE_CLAZZ) {
                // 仅反序列化 clazzName
                this.deserializeClazz();
            } else if (mask <= RpcDeserializeLevel.DESERIALIZE_HEADER) {
                // 仅反序列化 clazzName + header
                this.deserializeClazz();
                this.deserializeHeader(this.getInvokeContext());
            } else if (mask <= RpcDeserializeLevel.DESERIALIZE_ALL) {
                // 反序列化 clazzName + header + content
                this.deserialize();
            }
        }
    
        // Serialize content class.
        public void serializeClazz() {
        }
    
        // Deserialize the content class.
        public void deserializeClazz() {
        }
    
        // Serialize the header.
        public void serializeHeader(InvokeContext invokeContext) {
        }
    
        // Deserialize the header.
        public void deserializeHeader(InvokeContext invokeContext) {
        }
    
        // Serialize the content.
        @Override
        public void serializeContent(InvokeContext invokeContext) {
        }
    
        // Deserialize the content.
        @Override
        public void deserializeContent(InvokeContext invokeContext) {
        }
    
        @Override
        public byte getSerializer() {
            return serializer;
        }
    
        public void setSerializer(byte serializer) {
            this.serializer = serializer;
        }
    }
    
    ========================== RpcRequestCommand ==========================
    public class RpcRequestCommand extends RequestCommand {
        // 请求对象类型
        private String            requestClass;
        // 请求头
        private Object            requestHeader;
        // 真正的请求对象
        private Object            requestObject;
        // 自定义序列化器
        private CustomSerializer  customSerializer;
    
        @Override
        public void serializeClazz() {
            if (this.requestClass != null) {
                // 直接 String -> byte[]
                byte[] clz = this.requestClass.getBytes(Configs.DEFAULT_CHARSET);
                this.setClazz(clz);
            }
        }
    
        @Override
        public void deserializeClazz() {
            // this.getRequestClass() != null 表示已经反序列化过了 - 避免重复反序列化
            if (this.getClazz() != null && this.getRequestClass() == null) {
                // 直接 byte[] -> String
                this.setRequestClass(new String(this.getClazz(), Configs.DEFAULT_CHARSET));
            }
        }
    
        // 如果 customSerializer 存在,才会做 serializeHeader
        @Override
        public void serializeHeader(InvokeContext invokeContext) {
            // 如果 customSerializer 存在,才会做 serializeHeader
            if (this.getCustomSerializer() != null) {
                // 执行 customSerializer 自行实现的逻辑(此处就可以使用 invokeContext 做一些逻辑了)
                this.getCustomSerializer().serializeHeader(this, invokeContext);
            }
        }
    
        @Override
        public void deserializeHeader(InvokeContext invokeContext) {
            // this.getRequestHeader() != null 表示已经反序列化过了 - 避免重复反序列化
            if (this.getHeader() != null && this.getRequestHeader() == null) {
                if (this.getCustomSerializer() != null) {
                    // 执行 customSerializer 自行实现的逻辑
                    this.getCustomSerializer().deserializeHeader(this);
                }
            }
        }
    
        @Override
        public void serializeContent(InvokeContext invokeContext) {
            if (this.requestObject != null) {
                // 如果 customSerializer 存在,使用 customSerializer 自行实现的逻辑做 content 的序列化,如果失败,使用 Serializer 的序列化器
                if (this.getCustomSerializer() != null && this.getCustomSerializer().serializeContent(this, invokeContext)) {
                    return;
                }
                // 如果 customSerializer 不存在或序列化失败,使用 RpcCommand.serializer 属性获取指定的序列化器,然后通过该序列化器做序列化操作
                // RpcCommand.serializer 默认为1,即 hessian2 序列化器,可以通过 -Dbolt.serializer=1 来指定(前提是需要将指定的序列化器注册到 SerializerManager 中)- 这种是全局的;
                // 可以通过 invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, 序列化器序号来指定) - 这种可以为每一次的调用指定动态序列化器
                this.setContent(SerializerManager.getSerializer(this.getSerializer()).serialize(this.requestObject));
            }
        }
    
        @Override
        public void deserializeContent(InvokeContext invokeContext) throws DeserializationException {
            if (this.getRequestObject() == null) {
                // 如果 customSerializer 存在,使用 customSerializer 自行实现的逻辑做反序列化
                if (this.getCustomSerializer() != null && this.getCustomSerializer().deserializeContent(this)) {
                    return;
                }
                // 如果 customSerializer 不存在或反序列化失败,使用 RpcCommand.serializer 属性获取指定的序列化器,然后通过该序列化器做反序列化操作
                if (this.getContent() != null) {
                    this.setRequestObject(SerializerManager.getSerializer(this.getSerializer()).deserialize(this.getContent(), this.requestClass));
                }
            }
        }
    
        // 获取自定义序列化器
        public CustomSerializer getCustomSerializer() {
            // 如果有了,直接返回
            if (this.customSerializer != null) {
                return customSerializer;
            }
            // 先根据请求数据的全类名获取 CustomSerializer;如果获取不到,再根据 CommandCode 获取
            if (this.requestClass != null) {
                this.customSerializer = CustomSerializerManager.getCustomSerializer(this.requestClass);
            }
            if (this.customSerializer == null) {
                this.customSerializer = CustomSerializerManager.getCustomSerializer(this.getCmdCode());
            }
            return this.customSerializer;
        }
    }
    
    ========================== RpcResponseCommand ==========================
    public class RpcResponseCommand extends ResponseCommand {
        // 响应对象类型
        private String            responseClass;
        // 响应头
        private Object            responseHeader;
        // 真正的响应对象
        private Object            responseObject;
        // 自定义序列化器
        private CustomSerializer  customSerializer;
    
        @Override
        public void serializeClazz() {
            if (this.getResponseClass() != null) {
                // 直接 String -> byte[]
                byte[] clz = this.getResponseClass().getBytes(Configs.DEFAULT_CHARSET);
                this.setClazz(clz);
            }
        }
    
        @Override
        public void deserializeClazz() {
            // this.getResponseClass() != null 表示已经反序列化过了 - 避免重复反序列化
            if (this.getClazz() != null && this.getResponseClass() == null) {
                this.setResponseClass(new String(this.getClazz(), Configs.DEFAULT_CHARSET));
            }
        }
    
        // 如果 customSerializer 存在,才会做 serializeHeader
        @Override
        public void serializeHeader(InvokeContext invokeContext) {
            if (this.getCustomSerializer() != null) {
                // 如果 customSerializer 存在,才会做 serializeHeader
                this.getCustomSerializer().serializeHeader(this);
            }
        }
    
        @Override
        public void deserializeHeader(InvokeContext invokeContext) {
            // this.getResponseHeader() != null 表示已经反序列化过了 - 避免重复反序列化
            if (this.getHeader() != null && this.getResponseHeader() == null) {
                if (this.getCustomSerializer() != null) {
                    this.getCustomSerializer().deserializeHeader(this, invokeContext);
                }
            }
        }
    
        @Override
        public void serializeContent(InvokeContext invokeContext) throws SerializationException {
            if (this.getResponseObject() != null) {
                if (this.getCustomSerializer() != null && this.getCustomSerializer().serializeContent(this)) {
                    return;
                }
    
                this.setContent(SerializerManager.getSerializer(this.getSerializer()).serialize(this.responseObject));
            }
        }
    
        @Override
        public void deserializeContent(InvokeContext invokeContext) throws DeserializationException {
            if (this.getResponseObject() == null) {
                if (this.getCustomSerializer() != null && this.getCustomSerializer().deserializeContent(this, invokeContext)) {
                    return;
                }
                if (this.getContent() != null) {
                    this.setResponseObject(SerializerManager.getSerializer(this.getSerializer()).deserialize(this.getContent(), this.responseClass));
                }
            }
        }
    
        public CustomSerializer getCustomSerializer() {
            if (this.customSerializer != null) {
                return customSerializer;
            }
            if (this.responseClass != null) {
                this.customSerializer = CustomSerializerManager.getCustomSerializer(this.responseClass);
            }
            if (this.customSerializer == null) {
                this.customSerializer = CustomSerializerManager.getCustomSerializer(this.getCmdCode());
            }
            return this.customSerializer;
        }
    }
    
    

    3.4 调用入口

    ========================== RpcRemoting ==========================
    public abstract class RpcRemoting extends BaseRemoting {
        public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
            // 创建请求命令(序列化)
            RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
            ...
            ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,
                timeoutMillis);
            responseCommand.setInvokeContext(invokeContext);
            // 解析响应(反序列化)
            Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
            return responseObject;
        }
    
        protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
            // 创建 RpcRequestCommand
            RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);
    
            if (null != invokeContext) {
                // 设置调用级别的 Serializer
                Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
                if (null != clientCustomSerializer) {
                    command.setSerializer((Byte) clientCustomSerializer);
                }
            }
            command.setRequestClass(request.getClass().getName());
            command.setInvokeContext(invokeContext);
            // 请求的序列化
            command.serialize();
            return command;
        }
    }
    
    ========================== RpcRequestProcessor ==========================
    public class RpcRequestProcessor extends AbstractRemotingProcessor<RpcRequestCommand> {
        @Override
        public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
            // 首先反序列化 clazzName,因为需要 clazzName 来获取 UserProcessor,如果处理 clazzName 的 UserProcessor 不存在,则直接返回错误
            if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
                return;
            }
            // 根据clazz获取UserProcessor
            UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
            ...
    
            // 如果指定在IO线程处理请求,则直接反序列化全部,创建ProcessTask,直接执行
            if (userProcessor.processInIOThread()) {
                if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
                    return;
                }
                // process in io thread
                new ProcessTask(ctx, cmd).run();
                return;// end
            }
    
            // 如果指定不是在IO线程处理请求,则先获取线程池,创建ProcessTask,在新的线程池执行
            Executor executor;
            // 看是否配置了 UserProcessor.executorSelector,即线程池选择器,
            // 如果配置了:则需要反序列化出 header,因为 executorSelector 需要根据 header 去选择 executor;content 在异步线程池进行反序列化
            // 如果没有配置:则 header 和 content 都在选出的异步线程池进行反序列化
            if (null == userProcessor.getExecutorSelector()) {
                executor = userProcessor.getExecutor();
            } else {
                if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_HEADER)) {
                    return;
                }
                executor = userProcessor.getExecutorSelector().select(cmd.getRequestClass(),
                    cmd.getRequestHeader());
            }
            ...
            executor.execute(new ProcessTask(ctx, cmd));
        }
    
        @Override
        public void doProcess(RemotingContext ctx, RpcRequestCommand cmd) {
            ...
            // 反序列化全部
            if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
                return;
            }
            dispatchToUserProcessor(ctx, cmd);
        }
    
        public void sendResponseIfNecessary(RemotingContext ctx, byte type, RemotingCommand response) {
            final int id = response.getId();
            if (type != RpcCommandType.REQUEST_ONEWAY) {
                RemotingCommand serializedResponse = response;
                // 响应序列化
                response.serialize();
                // Netty 发送响应
                ctx.writeAndFlush(serializedResponse);
            }
        }
        
        private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
            ... 消息处理
            // 发送响应(创建 RpcResponseCommand,指定序列化器为请求的序列化器)
            sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
        }
        
        private boolean deserializeRequestCommand(RemotingContext ctx, RpcRequestCommand cmd, int level) {
            // 根据传入的序列化级别来序列化不同的内容
            cmd.deserialize(level);
        }
        
        class ProcessTask implements Runnable {
            @Override
            public void run() {
                RpcRequestProcessor.this.doProcess(ctx, msg);
            }
        }
    }
    
    public class RpcCommandFactory implements CommandFactory {
        @Override
        public RpcRequestCommand createRequestCommand(Object requestObject) {
            return new RpcRequestCommand(requestObject);
        }
    
        @Override
        public RpcResponseCommand createResponse(Object responseObject, RemotingCommand requestCmd) {
            RpcResponseCommand response = new RpcResponseCommand(requestCmd.getId(), responseObject);
            if (null != responseObject) {
                response.setResponseClass(responseObject.getClass().getName());
            } else {
                response.setResponseClass(null);
            }
            // 响应与请求用的是同一种序列化器
            response.setSerializer(requestCmd.getSerializer());
            ...
            return response;
        }
    }
    ========================== RpcInvokeCallbackListener ==========================
    public class RpcInvokeCallbackListener implements InvokeCallbackListener {
        @Override
        public void onResponse(InvokeFuture future) {
            InvokeCallback callback = future.getInvokeCallback();
            if (callback != null) {
                CallbackTask task = new CallbackTask(this.getRemoteAddress(), future);
                if (callback.getExecutor() != null) {
                    callback.getExecutor().execute(task);
                } else {
                    task.run();
                }
            }
        }
    
        class CallbackTask implements Runnable {
            @Override
            public void run() {
                InvokeCallback callback = future.getInvokeCallback();
                // 设置响应
                ResponseCommand response = (ResponseCommand) future.waitResponse(0);
                response.setInvokeContext(future.getInvokeContext());
                RpcResponseCommand rpcResponse = (RpcResponseCommand) response;
                // 反序列化
                response.deserialize();
                // 回调
                callback.onResponse(rpcResponse.getResponseObject());
            } // end of run
        }
    }
    
    ========================== RpcResponseResolver ==========================
    public class RpcResponseResolver {
        public static Object resolveResponseObject(ResponseCommand responseCommand, String addr) {
            return toResponseObject(responseCommand);
        }
    
        private static Object toResponseObject(ResponseCommand responseCommand) {
            RpcResponseCommand response = (RpcResponseCommand) responseCommand;
            // 响应的反序列化
            response.deserialize();
            return response.getResponseObject();
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:SOFABolt 源码分析20 - Serializer 序列化

          本文链接:https://www.haomeiwen.com/subject/qlsnoftx.html