美文网首页
1.sofa-bolt协议

1.sofa-bolt协议

作者: Young_5942 | 来源:发表于2020-08-10 21:49 被阅读0次

    sofa-bolt协议

    协议使用私有二进制协议,bolt协议在第一个版本中忽略的协议的版本,所以设计了第二个版本的协议,增加了协议的版本号,
    其协议构成如下:
     * 0     1     2     3     4           6           8          10     11    12          14          16
     * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
     * |proto| ver1| type| cmdcode   |ver2 |   requestId           |codec|switch|respstatus |  classLen |
     * +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
     * |headerLen  | contentLen            |                      ...                                   |
     * +-----------------------------------+                                                            +
     * |               className + header  + content  bytes                                             |
     * +                                                                                                +
     * |                               ... ...                                  | CRC32(optional)       |
     * +------------------------------------------------------------------------------------------------+
    
    协议的顶层接口:Protocol 
    包含了 Command的Encoder/Decoder,CommandHandler,CommandFactory
    将二进制按照上述协议构成解析为Command,以及将Command编码成二进制,包含了command的处理器和command的创建工厂
    
    协议子类实现 RpcProtocolV2
    public class RpcProtocolV2 implements Protocol {
        /* because the design defect, the version is neglected in RpcProtocol, so we design RpcProtocolV2 and add protocol version. */
        public static final byte PROTOCOL_CODE       = (byte) 2;
        /** version 1, is the same with RpcProtocol */
        public static final byte PROTOCOL_VERSION_1  = (byte) 1;
        /** version 2, is the protocol version for RpcProtocolV2 */
        public static final byte PROTOCOL_VERSION_2  = (byte) 2;
    
        /**
         * in contrast to protocol v1,
         * one more byte is used as protocol version,
         * and another one is userd as protocol switch
         */
        private static final int REQUEST_HEADER_LEN  = 22 + 2;
        private static final int RESPONSE_HEADER_LEN = 20 + 2;
        private CommandEncoder   encoder;
        private CommandDecoder   decoder;
        private HeartbeatTrigger heartbeatTrigger;
        private CommandHandler   commandHandler;
        private CommandFactory   commandFactory;
    
        public RpcProtocolV2() {
            this.encoder = new RpcCommandEncoderV2();
            this.decoder = new RpcCommandDecoderV2();
            this.commandFactory = new RpcCommandFactory();
            this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
            this.commandHandler = new RpcCommandHandler(this.commandFactory);
        }
    
        public static int getRequestHeaderLength() {
            return RpcProtocolV2.REQUEST_HEADER_LEN;
        }
    
        public static int getResponseHeaderLength() {
            return RpcProtocolV2.RESPONSE_HEADER_LEN;
        }
    
        @Override
        public CommandEncoder getEncoder() {
            return this.encoder;
        }
    
        @Override
        public CommandDecoder getDecoder() {
            return this.decoder;
        }
    
        @Override
        public HeartbeatTrigger getHeartbeatTrigger() {
            return this.heartbeatTrigger;
        }
    
        @Override
        public CommandHandler getCommandHandler() {
            return this.commandHandler;
        }
    
        @Override
        public CommandFactory getCommandFactory() {
            return this.commandFactory;
        }
    }
    
    Command的编解码器
    顶层接口:CommandEncoder、CommandDecoder,对command解析编解码
    
    public interface CommandEncoder {
    
        /**
         * Encode object into bytes.
         * 
         * @param ctx
         * @param msg
         * @param out
         * @throws Exception
         */
        void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception;
    
    }
    public interface CommandDecoder {
        /**
         * Decode bytes into object.
         * 
         * @param ctx
         * @param in
         * @param out
         * @throws Exception
         */
        void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    }
    
    CommandEncoder/CommandDecoder 子类实现
    RpcCommandEncoderV2/RpcCommandDecoderV2  按照协议将二进制转化为Command和将Command转化为二进制
    
    public class RpcCommandDecoderV2 implements CommandDecoder {
    
        private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
    
        private int                 lessLen;
    
        {
            lessLen = RpcProtocolV2.getResponseHeaderLength() < RpcProtocolV2.getRequestHeaderLength() ? RpcProtocolV2
                .getResponseHeaderLength() : RpcProtocolV2.getRequestHeaderLength();
        }
    
        /**
         * @see CommandDecoder#decode(ChannelHandlerContext, ByteBuf, List)
         */
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // the less length between response header and request header
            if (in.readableBytes() >= lessLen) {
                in.markReaderIndex();
                byte protocol = in.readByte();
                in.resetReaderIndex();
                if (protocol == RpcProtocolV2.PROTOCOL_CODE) {
                    /*
                     * ver: version for protocol
                     * type: request/response/request oneway
                     * cmdcode: code for remoting command
                     * ver2:version for remoting command
                     * requestId: id of request
                     * codec: code for codec
                     * switch: function switch
                     * (req)timeout: request timeout
                     * (resp)respStatus: response status
                     * classLen: length of request or response class name
                     * headerLen: length of header
                     * contentLen: length of content
                     * className
                     * header
                     * content
                     */
                    if (in.readableBytes() > 2 + 1) {
                        int startIndex = in.readerIndex();
                        in.markReaderIndex();
                        in.readByte(); //protocol code
                        byte version = in.readByte(); //protocol version
                        byte type = in.readByte(); //type
                        if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) {
                            //decode request
                            if (in.readableBytes() >= RpcProtocolV2.getRequestHeaderLength() - 3) {
                                short cmdCode = in.readShort();
                                byte ver2 = in.readByte();
                                int requestId = in.readInt();
                                byte serializer = in.readByte();
                                byte protocolSwitchValue = in.readByte();
                                int timeout = in.readInt();
                                short classLen = in.readShort();
                                short headerLen = in.readShort();
                                int contentLen = in.readInt();
                                byte[] clazz = null;
                                byte[] header = null;
                                byte[] content = null;
    
                                // decide the at-least bytes length for each version
                                int lengthAtLeastForV1 = classLen + headerLen + contentLen;
                                boolean crcSwitchOn = ProtocolSwitch.isOn(
                                    ProtocolSwitch.CRC_SWITCH_INDEX, protocolSwitchValue);
                                int lengthAtLeastForV2 = classLen + headerLen + contentLen;
                                if (crcSwitchOn) {
                                    lengthAtLeastForV2 += 4;// crc int
                                }
    
                                // continue read
                                if ((version == RpcProtocolV2.PROTOCOL_VERSION_1 && in.readableBytes() >= lengthAtLeastForV1)
                                    || (version == RpcProtocolV2.PROTOCOL_VERSION_2 && in
                                        .readableBytes() >= lengthAtLeastForV2)) {
                                    if (classLen > 0) {
                                        clazz = new byte[classLen];
                                        in.readBytes(clazz);
                                    }
                                    if (headerLen > 0) {
                                        header = new byte[headerLen];
                                        in.readBytes(header);
                                    }
                                    if (contentLen > 0) {
                                        content = new byte[contentLen];
                                        in.readBytes(content);
                                    }
                                    if (version == RpcProtocolV2.PROTOCOL_VERSION_2 && crcSwitchOn) {
                                        checkCRC(in, startIndex);
                                    }
                                } else {// not enough data
                                    in.resetReaderIndex();
                                    return;
                                }
                                RequestCommand command;
                                if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
                                    command = new HeartbeatCommand();
                                } else {
                                    command = createRequestCommand(cmdCode);
                                }
                                command.setType(type);
                                command.setVersion(ver2);
                                command.setId(requestId);
                                command.setSerializer(serializer);
                                command.setProtocolSwitch(ProtocolSwitch.create(protocolSwitchValue));
                                command.setTimeout(timeout);
                                command.setClazz(clazz);
                                command.setHeader(header);
                                command.setContent(content);
    
                                out.add(command);
                            } else {
                                in.resetReaderIndex();
                            }
                        } else if (type == RpcCommandType.RESPONSE) {
                            //decode response
                            if (in.readableBytes() >= RpcProtocolV2.getResponseHeaderLength() - 3) {
                                short cmdCode = in.readShort();
                                byte ver2 = in.readByte();
                                int requestId = in.readInt();
                                byte serializer = in.readByte();
                                byte protocolSwitchValue = in.readByte();
                                short status = in.readShort();
                                short classLen = in.readShort();
                                short headerLen = in.readShort();
                                int contentLen = in.readInt();
                                byte[] clazz = null;
                                byte[] header = null;
                                byte[] content = null;
    
                                // decide the at-least bytes length for each version
                                int lengthAtLeastForV1 = classLen + headerLen + contentLen;
                                boolean crcSwitchOn = ProtocolSwitch.isOn(
                                    ProtocolSwitch.CRC_SWITCH_INDEX, protocolSwitchValue);
                                int lengthAtLeastForV2 = classLen + headerLen + contentLen;
                                if (crcSwitchOn) {
                                    lengthAtLeastForV2 += 4;// crc int
                                }
    
                                // continue read
                                if ((version == RpcProtocolV2.PROTOCOL_VERSION_1 && in.readableBytes() >= lengthAtLeastForV1)
                                    || (version == RpcProtocolV2.PROTOCOL_VERSION_2 && in
                                        .readableBytes() >= lengthAtLeastForV2)) {
                                    if (classLen > 0) {
                                        clazz = new byte[classLen];
                                        in.readBytes(clazz);
                                    }
                                    if (headerLen > 0) {
                                        header = new byte[headerLen];
                                        in.readBytes(header);
                                    }
                                    if (contentLen > 0) {
                                        content = new byte[contentLen];
                                        in.readBytes(content);
                                    }
                                    if (version == RpcProtocolV2.PROTOCOL_VERSION_2 && crcSwitchOn) {
                                        checkCRC(in, startIndex);
                                    }
                                } else {// not enough data
                                    in.resetReaderIndex();
                                    return;
                                }
                                ResponseCommand command;
                                if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
                                    command = new HeartbeatAckCommand();
                                } else {
                                    command = createResponseCommand(cmdCode);
                                }
                                command.setType(type);
                                command.setVersion(ver2);
                                command.setId(requestId);
                                command.setSerializer(serializer);
                                command.setProtocolSwitch(ProtocolSwitch.create(protocolSwitchValue));
                                command.setResponseStatus(ResponseStatus.valueOf(status));
                                command.setClazz(clazz);
                                command.setHeader(header);
                                command.setContent(content);
                                command.setResponseTimeMillis(System.currentTimeMillis());
                                command.setResponseHost((InetSocketAddress) ctx.channel()
                                    .remoteAddress());
    
                                out.add(command);
                            } else {
                                in.resetReaderIndex();
                            }
                        } else {
                            String emsg = "Unknown command type: " + type;
                            logger.error(emsg);
                            throw new RuntimeException(emsg);
                        }
                    }
    
                } else {
                    String emsg = "Unknown protocol: " + protocol;
                    logger.error(emsg);
                    throw new RuntimeException(emsg);
                }
    
            }
        }
    
        private void checkCRC(ByteBuf in, int startIndex) {
            int endIndex = in.readerIndex();
            int expectedCrc = in.readInt();
            byte[] frame = new byte[endIndex - startIndex];
            in.getBytes(startIndex, frame, 0, endIndex - startIndex);
            int actualCrc = CrcUtil.crc32(frame);
            if (expectedCrc != actualCrc) {
                String err = "CRC check failed!";
                logger.error(err);
                throw new RuntimeException(err);
            }
        }
    
        private ResponseCommand createResponseCommand(short cmdCode) {
            ResponseCommand command = new RpcResponseCommand();
            command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
            return command;
        }
    
        private RpcRequestCommand createRequestCommand(short cmdCode) {
            RpcRequestCommand command = new RpcRequestCommand();
            command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
            command.setArriveTime(System.currentTimeMillis());
            return command;
        }
    }
    
    
    public class RpcCommandEncoderV2 implements CommandEncoder {
        /** logger */
        private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
    
        /**
         * @see CommandEncoder#encode(ChannelHandlerContext, Serializable, ByteBuf)
         */
        @Override
        public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
            try {
                if (msg instanceof RpcCommand) {
                    /*
                     * proto: magic code for protocol
                     * ver: version for protocol
                     * type: request/response/request oneway
                     * cmdcode: code for remoting command
                     * ver2:version for remoting command
                     * requestId: id of request
                     * codec: code for codec
                     * switch: function switch
                     * (req)timeout: request timeout.
                     * (resp)respStatus: response status
                     * classLen: length of request or response class name
                     * headerLen: length of header
                     * cotentLen: length of content
                     * className
                     * header
                     * content
                     * crc (optional)
                     */
                    int index = out.writerIndex();
                    RpcCommand cmd = (RpcCommand) msg;
                    out.writeByte(RpcProtocolV2.PROTOCOL_CODE);
                    Attribute<Byte> version = ctx.channel().attr(Connection.VERSION);
                    byte ver = RpcProtocolV2.PROTOCOL_VERSION_1;
                    if (version != null && version.get() != null) {
                        ver = version.get();
                    }
                    out.writeByte(ver);
                    out.writeByte(cmd.getType());
                    out.writeShort(((RpcCommand) msg).getCmdCode().value());
                    out.writeByte(cmd.getVersion());
                    out.writeInt(cmd.getId());
                    out.writeByte(cmd.getSerializer());
                    out.writeByte(cmd.getProtocolSwitch().toByte());
                    if (cmd instanceof RequestCommand) {
                        //timeout
                        out.writeInt(((RequestCommand) cmd).getTimeout());
                    }
                    if (cmd instanceof ResponseCommand) {
                        //response status
                        ResponseCommand response = (ResponseCommand) cmd;
                        out.writeShort(response.getResponseStatus().getValue());
                    }
                    out.writeShort(cmd.getClazzLength());
                    out.writeShort(cmd.getHeaderLength());
                    out.writeInt(cmd.getContentLength());
                    if (cmd.getClazzLength() > 0) {
                        out.writeBytes(cmd.getClazz());
                    }
                    if (cmd.getHeaderLength() > 0) {
                        out.writeBytes(cmd.getHeader());
                    }
                    if (cmd.getContentLength() > 0) {
                        out.writeBytes(cmd.getContent());
                    }
                    if (ver == RpcProtocolV2.PROTOCOL_VERSION_2
                        && cmd.getProtocolSwitch().isOn(ProtocolSwitch.CRC_SWITCH_INDEX)) {
                        // compute the crc32 and write to out
                        byte[] frame = new byte[out.readableBytes()];
                        out.getBytes(index, frame);
                        out.writeInt(CrcUtil.crc32(frame));
                    }
                } else {
                    String warnMsg = "msg type [" + msg.getClass() + "] is not subclass of RpcCommand";
                    logger.warn(warnMsg);
                }
            } catch (Exception e) {
                logger.error("Exception caught!", e);
                throw e;
            }
        }
    }
    
    
    CommandHandler
    对命令的处理逻辑,注册Command的执行逻辑,注册默认的执行线程池
    
    
    public interface CommandHandler {
        /**
         * Handle the command.
         * 
         * @param ctx
         * @param msg
         * @throws Exception
         */
        void handleCommand(RemotingContext ctx, Object msg) throws Exception;
    
        /**
         * Register processor for command with specified code.
         * 
         * @param cmd
         * @param processor
         */
        void registerProcessor(CommandCode cmd, RemotingProcessor<?> processor);
    
        /**
         * Register default executor for the handler.
         * 
         * @param executor
         */
        void registerDefaultExecutor(ExecutorService executor);
    
        /**
         * Get default executor for the handler.
         */
        ExecutorService getDefaultExecutor();
    }
    
    
    CommandHandler的默认实现类
    RpcCommandHandler 包含一个处理器管理类,ProcessorManger 和CommandFactory
    其构造时会构造ProcessManger ,注册请求处理器,响应处理器,心跳处理器
    该列使用@Sharable注解,是channel共享的handler
    
    @Sharable
    public class RpcCommandHandler implements CommandHandler {
    
        private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
        /** All processors */
        ProcessorManager            processorManager;
    
        CommandFactory              commandFactory;
    
        /**
         * Constructor. Initialize the processor manager and register processors.
         */
        public RpcCommandHandler(CommandFactory commandFactory) {
            this.commandFactory = commandFactory;
            this.processorManager = new ProcessorManager();
            //process request
            this.processorManager.registerProcessor(RpcCommandCode.RPC_REQUEST,
                new RpcRequestProcessor(this.commandFactory));
            //process response
            this.processorManager.registerProcessor(RpcCommandCode.RPC_RESPONSE,
                new RpcResponseProcessor());
    
            this.processorManager.registerProcessor(CommonCommandCode.HEARTBEAT,
                new RpcHeartBeatProcessor());
    
            this.processorManager
                .registerDefaultProcessor(new AbstractRemotingProcessor<RemotingCommand>() {
                    @Override
                    public void doProcess(RemotingContext ctx, RemotingCommand msg) throws Exception {
                        logger.error("No processor available for command code {}, msgId {}",
                            msg.getCmdCode(), msg.getId());
                    }
                });
        }
    
        /**
         * @see CommandHandler#handleCommand(RemotingContext, Object)
         */
        @Override
        public void handleCommand(RemotingContext ctx, Object msg) throws Exception {
            this.handle(ctx, msg);
        }
    
        /*
         * Handle the request(s).
         */
        private void handle(final RemotingContext ctx, final Object msg) {
            try {
                if (msg instanceof List) {
                    final Runnable handleTask = new Runnable() {
                        @Override
                        public void run() {
                            if (logger.isDebugEnabled()) {
                                logger.debug("Batch message! size={}", ((List<?>) msg).size());
                            }
                            for (final Object m : (List<?>) msg) {
                                RpcCommandHandler.this.process(ctx, m);
                            }
                        }
                    };
                    if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
                        // If msg is list ,then the batch submission to biz threadpool can save io thread.
                        // See com.alipay.remoting.decoder.ProtocolDecoder
                        processorManager.getDefaultExecutor().execute(handleTask);
                    } else {
                        handleTask.run();
                    }
                } else {
                    process(ctx, msg);
                }
            } catch (final Throwable t) {
                processException(ctx, msg, t);
            }
        }
    
        @SuppressWarnings({ "rawtypes", "unchecked" })
        private void process(RemotingContext ctx, Object msg) {
            try {
                final RpcCommand cmd = (RpcCommand) msg;
                final RemotingProcessor processor = processorManager.getProcessor(cmd.getCmdCode());
                processor.process(ctx, cmd, processorManager.getDefaultExecutor());
            } catch (final Throwable t) {
                processException(ctx, msg, t);
            }
        }
    
        private void processException(RemotingContext ctx, Object msg, Throwable t) {
            if (msg instanceof List) {
                for (final Object m : (List<?>) msg) {
                    processExceptionForSingleCommand(ctx, m, t);
                }
            } else {
                processExceptionForSingleCommand(ctx, msg, t);
            }
        }
    
        /*
         * Return error command if necessary.
         */
        private void processExceptionForSingleCommand(RemotingContext ctx, Object msg, Throwable t) {
            final int id = ((RpcCommand) msg).getId();
            final String emsg = "Exception caught when processing "
                                + ((msg instanceof RequestCommand) ? "request, id=" : "response, id=");
            logger.warn(emsg + id, t);
            if (msg instanceof RequestCommand) {
                final RequestCommand cmd = (RequestCommand) msg;
                if (cmd.getType() != RpcCommandType.REQUEST_ONEWAY) {
                    if (t instanceof RejectedExecutionException) {
                        final ResponseCommand response = this.commandFactory.createExceptionResponse(
                            id, ResponseStatus.SERVER_THREADPOOL_BUSY);
                        // RejectedExecutionException here assures no response has been sent back
                        // Other exceptions should be processed where exception was caught, because here we don't known whether ack had been sent back.
                        ctx.getChannelContext().writeAndFlush(response)
                            .addListener(new ChannelFutureListener() {
                                @Override
                                public void operationComplete(ChannelFuture future) throws Exception {
                                    if (future.isSuccess()) {
                                        if (logger.isInfoEnabled()) {
                                            logger
                                                .info(
                                                    "Write back exception response done, requestId={}, status={}",
                                                    id, response.getResponseStatus());
                                        }
                                    } else {
                                        logger.error(
                                            "Write back exception response failed, requestId={}", id,
                                            future.cause());
                                    }
                                }
    
                            });
                    }
                }
            }
        }
    
        /**
         * @see CommandHandler#registerProcessor(com.alipay.remoting.CommandCode, RemotingProcessor)
         */
        @Override
        public void registerProcessor(CommandCode cmd,
                                      @SuppressWarnings("rawtypes") RemotingProcessor processor) {
            this.processorManager.registerProcessor(cmd, processor);
        }
    
        /**
         * @see CommandHandler#registerDefaultExecutor(java.util.concurrent.ExecutorService)
         */
        @Override
        public void registerDefaultExecutor(ExecutorService executor) {
            this.processorManager.registerDefaultExecutor(executor);
        }
    
        /**
         * @see CommandHandler#getDefaultExecutor()
         */
        @Override
        public ExecutorService getDefaultExecutor() {
            return this.processorManager.getDefaultExecutor();
        }
    }
    
    
    命令的工厂类
    CommandFactory 提供请求类型命令的创建,响应类型命令的创建 ,异常响应创建,超时响应创建,发送失败响应创建
    
    public interface CommandFactory {
        // ~~~ create request command
    
        /**
         * create a request command with request object
         *
         * @param requestObject the request object included in request command
         * @param <T>
         * @return
         */
        <T extends RemotingCommand> T createRequestCommand(final Object requestObject);
    
        // ~~~ create response command
    
        /**
         * create a normal response with response object
         * @param responseObject
         * @param requestCmd
         * @param <T>
         * @return
         */
        <T extends RemotingCommand> T createResponse(final Object responseObject,
                                                     RemotingCommand requestCmd);
    
        <T extends RemotingCommand> T createExceptionResponse(int id, String errMsg);
    
        <T extends RemotingCommand> T createExceptionResponse(int id, final Throwable t, String errMsg);
    
        <T extends RemotingCommand> T createExceptionResponse(int id, ResponseStatus status);
    
        <T extends RemotingCommand> T createExceptionResponse(int id, ResponseStatus status,
                                                              final Throwable t);
    
        <T extends RemotingCommand> T createTimeoutResponse(final InetSocketAddress address);
    
        <T extends RemotingCommand> T createSendFailedResponse(final InetSocketAddress address,
                                                               Throwable throwable);
    
        <T extends RemotingCommand> T createConnectionClosedResponse(final InetSocketAddress address,
                                                                     String message);
    }
    
    其实现子类:
    public class RpcCommandFactory implements CommandFactory {
        @Override
        public RpcRequestCommand createRequestCommand(Object requestObject) {
            return new RpcRequestCommand(requestObject);
        }
    
        @Override
        public RpcResponseCommand createResponse(final Object responseObject,
                                                 final RemotingCommand requestCmd) {
            RpcResponseCommand response = new RpcResponseCommand(requestCmd.getId(), responseObject);
            if (null != responseObject) {
                response.setResponseClass(responseObject.getClass().getName());
            } else {
                response.setResponseClass(null);
            }
            response.setSerializer(requestCmd.getSerializer());
            response.setProtocolSwitch(requestCmd.getProtocolSwitch());
            response.setResponseStatus(ResponseStatus.SUCCESS);
            return response;
        }
    
        @Override
        public RpcResponseCommand createExceptionResponse(int id, String errMsg) {
            return createExceptionResponse(id, null, errMsg);
        }
    
        @Override
        public RpcResponseCommand createExceptionResponse(int id, final Throwable t, String errMsg) {
            RpcResponseCommand response = null;
            if (null == t) {
                response = new RpcResponseCommand(id, createServerException(errMsg));
            } else {
                response = new RpcResponseCommand(id, createServerException(t, errMsg));
            }
            response.setResponseClass(RpcServerException.class.getName());
            response.setResponseStatus(ResponseStatus.SERVER_EXCEPTION);
            return response;
        }
    
        @Override
        public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status) {
            RpcResponseCommand responseCommand = new RpcResponseCommand();
            responseCommand.setId(id);
            responseCommand.setResponseStatus(status);
            return responseCommand;
        }
    
        @Override
        public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status, Throwable t) {
            RpcResponseCommand responseCommand = this.createExceptionResponse(id, status);
            responseCommand.setResponseObject(createServerException(t, null));
            responseCommand.setResponseClass(RpcServerException.class.getName());
            return responseCommand;
        }
    
        @Override
        public ResponseCommand createTimeoutResponse(InetSocketAddress address) {
            ResponseCommand responseCommand = new ResponseCommand();
            responseCommand.setResponseStatus(ResponseStatus.TIMEOUT);
            responseCommand.setResponseTimeMillis(System.currentTimeMillis());
            responseCommand.setResponseHost(address);
            return responseCommand;
        }
    
        @Override
        public RemotingCommand createSendFailedResponse(final InetSocketAddress address,
                                                        Throwable throwable) {
            ResponseCommand responseCommand = new ResponseCommand();
            responseCommand.setResponseStatus(ResponseStatus.CLIENT_SEND_ERROR);
            responseCommand.setResponseTimeMillis(System.currentTimeMillis());
            responseCommand.setResponseHost(address);
            responseCommand.setCause(throwable);
            return responseCommand;
        }
    
        @Override
        public RemotingCommand createConnectionClosedResponse(InetSocketAddress address, String message) {
            ResponseCommand responseCommand = new ResponseCommand();
            responseCommand.setResponseStatus(ResponseStatus.CONNECTION_CLOSED);
            responseCommand.setResponseTimeMillis(System.currentTimeMillis());
            responseCommand.setResponseHost(address);
            return responseCommand;
        }
    
        /**
         * create server exception using error msg, no stack trace
         * @param errMsg the assigned error message
         * @return an instance of RpcServerException
         */
        private RpcServerException createServerException(String errMsg) {
            return new RpcServerException(errMsg);
        }
    
        /**
         * create server exception using error msg and fill the stack trace using the stack trace of throwable.
         *
         * @param t the origin throwable to fill the stack trace of rpc server exception
         * @param errMsg additional error msg, <code>null</code> is allowed
         * @return an instance of RpcServerException
         */
        private RpcServerException createServerException(Throwable t, String errMsg) {
            String formattedErrMsg = String.format(
                "[Server]OriginErrorMsg: %s: %s. AdditionalErrorMsg: %s", t.getClass().getName(),
                t.getMessage(), errMsg);
            RpcServerException e = new RpcServerException(formattedErrMsg);
            e.setStackTrace(t.getStackTrace());
            return e;
        }
    }
    
    

    相关文章

      网友评论

          本文标题:1.sofa-bolt协议

          本文链接:https://www.haomeiwen.com/subject/nvjsdktx.html