sofa-bolt协议
协议使用私有二进制协议,bolt协议在第一个版本中忽略的协议的版本,所以设计了第二个版本的协议,增加了协议的版本号,
其协议构成如下:
* 0 1 2 3 4 6 8 10 11 12 14 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
* |proto| ver1| type| cmdcode |ver2 | requestId |codec|switch|respstatus | classLen |
* +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
* |headerLen | contentLen | ... |
* +-----------------------------------+ +
* | className + header + content bytes |
* + +
* | ... ... | CRC32(optional) |
* +------------------------------------------------------------------------------------------------+
协议的顶层接口:Protocol
包含了 Command的Encoder/Decoder,CommandHandler,CommandFactory
将二进制按照上述协议构成解析为Command,以及将Command编码成二进制,包含了command的处理器和command的创建工厂
协议子类实现 RpcProtocolV2
public class RpcProtocolV2 implements Protocol {
/* because the design defect, the version is neglected in RpcProtocol, so we design RpcProtocolV2 and add protocol version. */
public static final byte PROTOCOL_CODE = (byte) 2;
/** version 1, is the same with RpcProtocol */
public static final byte PROTOCOL_VERSION_1 = (byte) 1;
/** version 2, is the protocol version for RpcProtocolV2 */
public static final byte PROTOCOL_VERSION_2 = (byte) 2;
/**
* in contrast to protocol v1,
* one more byte is used as protocol version,
* and another one is userd as protocol switch
*/
private static final int REQUEST_HEADER_LEN = 22 + 2;
private static final int RESPONSE_HEADER_LEN = 20 + 2;
private CommandEncoder encoder;
private CommandDecoder decoder;
private HeartbeatTrigger heartbeatTrigger;
private CommandHandler commandHandler;
private CommandFactory commandFactory;
public RpcProtocolV2() {
this.encoder = new RpcCommandEncoderV2();
this.decoder = new RpcCommandDecoderV2();
this.commandFactory = new RpcCommandFactory();
this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
this.commandHandler = new RpcCommandHandler(this.commandFactory);
}
public static int getRequestHeaderLength() {
return RpcProtocolV2.REQUEST_HEADER_LEN;
}
public static int getResponseHeaderLength() {
return RpcProtocolV2.RESPONSE_HEADER_LEN;
}
@Override
public CommandEncoder getEncoder() {
return this.encoder;
}
@Override
public CommandDecoder getDecoder() {
return this.decoder;
}
@Override
public HeartbeatTrigger getHeartbeatTrigger() {
return this.heartbeatTrigger;
}
@Override
public CommandHandler getCommandHandler() {
return this.commandHandler;
}
@Override
public CommandFactory getCommandFactory() {
return this.commandFactory;
}
}
Command的编解码器
顶层接口:CommandEncoder、CommandDecoder,对command解析编解码
public interface CommandEncoder {
/**
* Encode object into bytes.
*
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception;
}
public interface CommandDecoder {
/**
* Decode bytes into object.
*
* @param ctx
* @param in
* @param out
* @throws Exception
*/
void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}
CommandEncoder/CommandDecoder 子类实现
RpcCommandEncoderV2/RpcCommandDecoderV2 按照协议将二进制转化为Command和将Command转化为二进制
public class RpcCommandDecoderV2 implements CommandDecoder {
private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
private int lessLen;
{
lessLen = RpcProtocolV2.getResponseHeaderLength() < RpcProtocolV2.getRequestHeaderLength() ? RpcProtocolV2
.getResponseHeaderLength() : RpcProtocolV2.getRequestHeaderLength();
}
/**
* @see CommandDecoder#decode(ChannelHandlerContext, ByteBuf, List)
*/
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// the less length between response header and request header
if (in.readableBytes() >= lessLen) {
in.markReaderIndex();
byte protocol = in.readByte();
in.resetReaderIndex();
if (protocol == RpcProtocolV2.PROTOCOL_CODE) {
/*
* ver: version for protocol
* type: request/response/request oneway
* cmdcode: code for remoting command
* ver2:version for remoting command
* requestId: id of request
* codec: code for codec
* switch: function switch
* (req)timeout: request timeout
* (resp)respStatus: response status
* classLen: length of request or response class name
* headerLen: length of header
* contentLen: length of content
* className
* header
* content
*/
if (in.readableBytes() > 2 + 1) {
int startIndex = in.readerIndex();
in.markReaderIndex();
in.readByte(); //protocol code
byte version = in.readByte(); //protocol version
byte type = in.readByte(); //type
if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) {
//decode request
if (in.readableBytes() >= RpcProtocolV2.getRequestHeaderLength() - 3) {
short cmdCode = in.readShort();
byte ver2 = in.readByte();
int requestId = in.readInt();
byte serializer = in.readByte();
byte protocolSwitchValue = in.readByte();
int timeout = in.readInt();
short classLen = in.readShort();
short headerLen = in.readShort();
int contentLen = in.readInt();
byte[] clazz = null;
byte[] header = null;
byte[] content = null;
// decide the at-least bytes length for each version
int lengthAtLeastForV1 = classLen + headerLen + contentLen;
boolean crcSwitchOn = ProtocolSwitch.isOn(
ProtocolSwitch.CRC_SWITCH_INDEX, protocolSwitchValue);
int lengthAtLeastForV2 = classLen + headerLen + contentLen;
if (crcSwitchOn) {
lengthAtLeastForV2 += 4;// crc int
}
// continue read
if ((version == RpcProtocolV2.PROTOCOL_VERSION_1 && in.readableBytes() >= lengthAtLeastForV1)
|| (version == RpcProtocolV2.PROTOCOL_VERSION_2 && in
.readableBytes() >= lengthAtLeastForV2)) {
if (classLen > 0) {
clazz = new byte[classLen];
in.readBytes(clazz);
}
if (headerLen > 0) {
header = new byte[headerLen];
in.readBytes(header);
}
if (contentLen > 0) {
content = new byte[contentLen];
in.readBytes(content);
}
if (version == RpcProtocolV2.PROTOCOL_VERSION_2 && crcSwitchOn) {
checkCRC(in, startIndex);
}
} else {// not enough data
in.resetReaderIndex();
return;
}
RequestCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
command = new HeartbeatCommand();
} else {
command = createRequestCommand(cmdCode);
}
command.setType(type);
command.setVersion(ver2);
command.setId(requestId);
command.setSerializer(serializer);
command.setProtocolSwitch(ProtocolSwitch.create(protocolSwitchValue));
command.setTimeout(timeout);
command.setClazz(clazz);
command.setHeader(header);
command.setContent(content);
out.add(command);
} else {
in.resetReaderIndex();
}
} else if (type == RpcCommandType.RESPONSE) {
//decode response
if (in.readableBytes() >= RpcProtocolV2.getResponseHeaderLength() - 3) {
short cmdCode = in.readShort();
byte ver2 = in.readByte();
int requestId = in.readInt();
byte serializer = in.readByte();
byte protocolSwitchValue = in.readByte();
short status = in.readShort();
short classLen = in.readShort();
short headerLen = in.readShort();
int contentLen = in.readInt();
byte[] clazz = null;
byte[] header = null;
byte[] content = null;
// decide the at-least bytes length for each version
int lengthAtLeastForV1 = classLen + headerLen + contentLen;
boolean crcSwitchOn = ProtocolSwitch.isOn(
ProtocolSwitch.CRC_SWITCH_INDEX, protocolSwitchValue);
int lengthAtLeastForV2 = classLen + headerLen + contentLen;
if (crcSwitchOn) {
lengthAtLeastForV2 += 4;// crc int
}
// continue read
if ((version == RpcProtocolV2.PROTOCOL_VERSION_1 && in.readableBytes() >= lengthAtLeastForV1)
|| (version == RpcProtocolV2.PROTOCOL_VERSION_2 && in
.readableBytes() >= lengthAtLeastForV2)) {
if (classLen > 0) {
clazz = new byte[classLen];
in.readBytes(clazz);
}
if (headerLen > 0) {
header = new byte[headerLen];
in.readBytes(header);
}
if (contentLen > 0) {
content = new byte[contentLen];
in.readBytes(content);
}
if (version == RpcProtocolV2.PROTOCOL_VERSION_2 && crcSwitchOn) {
checkCRC(in, startIndex);
}
} else {// not enough data
in.resetReaderIndex();
return;
}
ResponseCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
command = new HeartbeatAckCommand();
} else {
command = createResponseCommand(cmdCode);
}
command.setType(type);
command.setVersion(ver2);
command.setId(requestId);
command.setSerializer(serializer);
command.setProtocolSwitch(ProtocolSwitch.create(protocolSwitchValue));
command.setResponseStatus(ResponseStatus.valueOf(status));
command.setClazz(clazz);
command.setHeader(header);
command.setContent(content);
command.setResponseTimeMillis(System.currentTimeMillis());
command.setResponseHost((InetSocketAddress) ctx.channel()
.remoteAddress());
out.add(command);
} else {
in.resetReaderIndex();
}
} else {
String emsg = "Unknown command type: " + type;
logger.error(emsg);
throw new RuntimeException(emsg);
}
}
} else {
String emsg = "Unknown protocol: " + protocol;
logger.error(emsg);
throw new RuntimeException(emsg);
}
}
}
private void checkCRC(ByteBuf in, int startIndex) {
int endIndex = in.readerIndex();
int expectedCrc = in.readInt();
byte[] frame = new byte[endIndex - startIndex];
in.getBytes(startIndex, frame, 0, endIndex - startIndex);
int actualCrc = CrcUtil.crc32(frame);
if (expectedCrc != actualCrc) {
String err = "CRC check failed!";
logger.error(err);
throw new RuntimeException(err);
}
}
private ResponseCommand createResponseCommand(short cmdCode) {
ResponseCommand command = new RpcResponseCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
return command;
}
private RpcRequestCommand createRequestCommand(short cmdCode) {
RpcRequestCommand command = new RpcRequestCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
command.setArriveTime(System.currentTimeMillis());
return command;
}
}
public class RpcCommandEncoderV2 implements CommandEncoder {
/** logger */
private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
/**
* @see CommandEncoder#encode(ChannelHandlerContext, Serializable, ByteBuf)
*/
@Override
public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
try {
if (msg instanceof RpcCommand) {
/*
* proto: magic code for protocol
* ver: version for protocol
* type: request/response/request oneway
* cmdcode: code for remoting command
* ver2:version for remoting command
* requestId: id of request
* codec: code for codec
* switch: function switch
* (req)timeout: request timeout.
* (resp)respStatus: response status
* classLen: length of request or response class name
* headerLen: length of header
* cotentLen: length of content
* className
* header
* content
* crc (optional)
*/
int index = out.writerIndex();
RpcCommand cmd = (RpcCommand) msg;
out.writeByte(RpcProtocolV2.PROTOCOL_CODE);
Attribute<Byte> version = ctx.channel().attr(Connection.VERSION);
byte ver = RpcProtocolV2.PROTOCOL_VERSION_1;
if (version != null && version.get() != null) {
ver = version.get();
}
out.writeByte(ver);
out.writeByte(cmd.getType());
out.writeShort(((RpcCommand) msg).getCmdCode().value());
out.writeByte(cmd.getVersion());
out.writeInt(cmd.getId());
out.writeByte(cmd.getSerializer());
out.writeByte(cmd.getProtocolSwitch().toByte());
if (cmd instanceof RequestCommand) {
//timeout
out.writeInt(((RequestCommand) cmd).getTimeout());
}
if (cmd instanceof ResponseCommand) {
//response status
ResponseCommand response = (ResponseCommand) cmd;
out.writeShort(response.getResponseStatus().getValue());
}
out.writeShort(cmd.getClazzLength());
out.writeShort(cmd.getHeaderLength());
out.writeInt(cmd.getContentLength());
if (cmd.getClazzLength() > 0) {
out.writeBytes(cmd.getClazz());
}
if (cmd.getHeaderLength() > 0) {
out.writeBytes(cmd.getHeader());
}
if (cmd.getContentLength() > 0) {
out.writeBytes(cmd.getContent());
}
if (ver == RpcProtocolV2.PROTOCOL_VERSION_2
&& cmd.getProtocolSwitch().isOn(ProtocolSwitch.CRC_SWITCH_INDEX)) {
// compute the crc32 and write to out
byte[] frame = new byte[out.readableBytes()];
out.getBytes(index, frame);
out.writeInt(CrcUtil.crc32(frame));
}
} else {
String warnMsg = "msg type [" + msg.getClass() + "] is not subclass of RpcCommand";
logger.warn(warnMsg);
}
} catch (Exception e) {
logger.error("Exception caught!", e);
throw e;
}
}
}
CommandHandler
对命令的处理逻辑,注册Command的执行逻辑,注册默认的执行线程池
public interface CommandHandler {
/**
* Handle the command.
*
* @param ctx
* @param msg
* @throws Exception
*/
void handleCommand(RemotingContext ctx, Object msg) throws Exception;
/**
* Register processor for command with specified code.
*
* @param cmd
* @param processor
*/
void registerProcessor(CommandCode cmd, RemotingProcessor<?> processor);
/**
* Register default executor for the handler.
*
* @param executor
*/
void registerDefaultExecutor(ExecutorService executor);
/**
* Get default executor for the handler.
*/
ExecutorService getDefaultExecutor();
}
CommandHandler的默认实现类
RpcCommandHandler 包含一个处理器管理类,ProcessorManger 和CommandFactory
其构造时会构造ProcessManger ,注册请求处理器,响应处理器,心跳处理器
该列使用@Sharable注解,是channel共享的handler
@Sharable
public class RpcCommandHandler implements CommandHandler {
private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
/** All processors */
ProcessorManager processorManager;
CommandFactory commandFactory;
/**
* Constructor. Initialize the processor manager and register processors.
*/
public RpcCommandHandler(CommandFactory commandFactory) {
this.commandFactory = commandFactory;
this.processorManager = new ProcessorManager();
//process request
this.processorManager.registerProcessor(RpcCommandCode.RPC_REQUEST,
new RpcRequestProcessor(this.commandFactory));
//process response
this.processorManager.registerProcessor(RpcCommandCode.RPC_RESPONSE,
new RpcResponseProcessor());
this.processorManager.registerProcessor(CommonCommandCode.HEARTBEAT,
new RpcHeartBeatProcessor());
this.processorManager
.registerDefaultProcessor(new AbstractRemotingProcessor<RemotingCommand>() {
@Override
public void doProcess(RemotingContext ctx, RemotingCommand msg) throws Exception {
logger.error("No processor available for command code {}, msgId {}",
msg.getCmdCode(), msg.getId());
}
});
}
/**
* @see CommandHandler#handleCommand(RemotingContext, Object)
*/
@Override
public void handleCommand(RemotingContext ctx, Object msg) throws Exception {
this.handle(ctx, msg);
}
/*
* Handle the request(s).
*/
private void handle(final RemotingContext ctx, final Object msg) {
try {
if (msg instanceof List) {
final Runnable handleTask = new Runnable() {
@Override
public void run() {
if (logger.isDebugEnabled()) {
logger.debug("Batch message! size={}", ((List<?>) msg).size());
}
for (final Object m : (List<?>) msg) {
RpcCommandHandler.this.process(ctx, m);
}
}
};
if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
// If msg is list ,then the batch submission to biz threadpool can save io thread.
// See com.alipay.remoting.decoder.ProtocolDecoder
processorManager.getDefaultExecutor().execute(handleTask);
} else {
handleTask.run();
}
} else {
process(ctx, msg);
}
} catch (final Throwable t) {
processException(ctx, msg, t);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void process(RemotingContext ctx, Object msg) {
try {
final RpcCommand cmd = (RpcCommand) msg;
final RemotingProcessor processor = processorManager.getProcessor(cmd.getCmdCode());
processor.process(ctx, cmd, processorManager.getDefaultExecutor());
} catch (final Throwable t) {
processException(ctx, msg, t);
}
}
private void processException(RemotingContext ctx, Object msg, Throwable t) {
if (msg instanceof List) {
for (final Object m : (List<?>) msg) {
processExceptionForSingleCommand(ctx, m, t);
}
} else {
processExceptionForSingleCommand(ctx, msg, t);
}
}
/*
* Return error command if necessary.
*/
private void processExceptionForSingleCommand(RemotingContext ctx, Object msg, Throwable t) {
final int id = ((RpcCommand) msg).getId();
final String emsg = "Exception caught when processing "
+ ((msg instanceof RequestCommand) ? "request, id=" : "response, id=");
logger.warn(emsg + id, t);
if (msg instanceof RequestCommand) {
final RequestCommand cmd = (RequestCommand) msg;
if (cmd.getType() != RpcCommandType.REQUEST_ONEWAY) {
if (t instanceof RejectedExecutionException) {
final ResponseCommand response = this.commandFactory.createExceptionResponse(
id, ResponseStatus.SERVER_THREADPOOL_BUSY);
// RejectedExecutionException here assures no response has been sent back
// Other exceptions should be processed where exception was caught, because here we don't known whether ack had been sent back.
ctx.getChannelContext().writeAndFlush(response)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (logger.isInfoEnabled()) {
logger
.info(
"Write back exception response done, requestId={}, status={}",
id, response.getResponseStatus());
}
} else {
logger.error(
"Write back exception response failed, requestId={}", id,
future.cause());
}
}
});
}
}
}
}
/**
* @see CommandHandler#registerProcessor(com.alipay.remoting.CommandCode, RemotingProcessor)
*/
@Override
public void registerProcessor(CommandCode cmd,
@SuppressWarnings("rawtypes") RemotingProcessor processor) {
this.processorManager.registerProcessor(cmd, processor);
}
/**
* @see CommandHandler#registerDefaultExecutor(java.util.concurrent.ExecutorService)
*/
@Override
public void registerDefaultExecutor(ExecutorService executor) {
this.processorManager.registerDefaultExecutor(executor);
}
/**
* @see CommandHandler#getDefaultExecutor()
*/
@Override
public ExecutorService getDefaultExecutor() {
return this.processorManager.getDefaultExecutor();
}
}
命令的工厂类
CommandFactory 提供请求类型命令的创建,响应类型命令的创建 ,异常响应创建,超时响应创建,发送失败响应创建
public interface CommandFactory {
// ~~~ create request command
/**
* create a request command with request object
*
* @param requestObject the request object included in request command
* @param <T>
* @return
*/
<T extends RemotingCommand> T createRequestCommand(final Object requestObject);
// ~~~ create response command
/**
* create a normal response with response object
* @param responseObject
* @param requestCmd
* @param <T>
* @return
*/
<T extends RemotingCommand> T createResponse(final Object responseObject,
RemotingCommand requestCmd);
<T extends RemotingCommand> T createExceptionResponse(int id, String errMsg);
<T extends RemotingCommand> T createExceptionResponse(int id, final Throwable t, String errMsg);
<T extends RemotingCommand> T createExceptionResponse(int id, ResponseStatus status);
<T extends RemotingCommand> T createExceptionResponse(int id, ResponseStatus status,
final Throwable t);
<T extends RemotingCommand> T createTimeoutResponse(final InetSocketAddress address);
<T extends RemotingCommand> T createSendFailedResponse(final InetSocketAddress address,
Throwable throwable);
<T extends RemotingCommand> T createConnectionClosedResponse(final InetSocketAddress address,
String message);
}
其实现子类:
public class RpcCommandFactory implements CommandFactory {
@Override
public RpcRequestCommand createRequestCommand(Object requestObject) {
return new RpcRequestCommand(requestObject);
}
@Override
public RpcResponseCommand createResponse(final Object responseObject,
final RemotingCommand requestCmd) {
RpcResponseCommand response = new RpcResponseCommand(requestCmd.getId(), responseObject);
if (null != responseObject) {
response.setResponseClass(responseObject.getClass().getName());
} else {
response.setResponseClass(null);
}
response.setSerializer(requestCmd.getSerializer());
response.setProtocolSwitch(requestCmd.getProtocolSwitch());
response.setResponseStatus(ResponseStatus.SUCCESS);
return response;
}
@Override
public RpcResponseCommand createExceptionResponse(int id, String errMsg) {
return createExceptionResponse(id, null, errMsg);
}
@Override
public RpcResponseCommand createExceptionResponse(int id, final Throwable t, String errMsg) {
RpcResponseCommand response = null;
if (null == t) {
response = new RpcResponseCommand(id, createServerException(errMsg));
} else {
response = new RpcResponseCommand(id, createServerException(t, errMsg));
}
response.setResponseClass(RpcServerException.class.getName());
response.setResponseStatus(ResponseStatus.SERVER_EXCEPTION);
return response;
}
@Override
public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status) {
RpcResponseCommand responseCommand = new RpcResponseCommand();
responseCommand.setId(id);
responseCommand.setResponseStatus(status);
return responseCommand;
}
@Override
public RpcResponseCommand createExceptionResponse(int id, ResponseStatus status, Throwable t) {
RpcResponseCommand responseCommand = this.createExceptionResponse(id, status);
responseCommand.setResponseObject(createServerException(t, null));
responseCommand.setResponseClass(RpcServerException.class.getName());
return responseCommand;
}
@Override
public ResponseCommand createTimeoutResponse(InetSocketAddress address) {
ResponseCommand responseCommand = new ResponseCommand();
responseCommand.setResponseStatus(ResponseStatus.TIMEOUT);
responseCommand.setResponseTimeMillis(System.currentTimeMillis());
responseCommand.setResponseHost(address);
return responseCommand;
}
@Override
public RemotingCommand createSendFailedResponse(final InetSocketAddress address,
Throwable throwable) {
ResponseCommand responseCommand = new ResponseCommand();
responseCommand.setResponseStatus(ResponseStatus.CLIENT_SEND_ERROR);
responseCommand.setResponseTimeMillis(System.currentTimeMillis());
responseCommand.setResponseHost(address);
responseCommand.setCause(throwable);
return responseCommand;
}
@Override
public RemotingCommand createConnectionClosedResponse(InetSocketAddress address, String message) {
ResponseCommand responseCommand = new ResponseCommand();
responseCommand.setResponseStatus(ResponseStatus.CONNECTION_CLOSED);
responseCommand.setResponseTimeMillis(System.currentTimeMillis());
responseCommand.setResponseHost(address);
return responseCommand;
}
/**
* create server exception using error msg, no stack trace
* @param errMsg the assigned error message
* @return an instance of RpcServerException
*/
private RpcServerException createServerException(String errMsg) {
return new RpcServerException(errMsg);
}
/**
* create server exception using error msg and fill the stack trace using the stack trace of throwable.
*
* @param t the origin throwable to fill the stack trace of rpc server exception
* @param errMsg additional error msg, <code>null</code> is allowed
* @return an instance of RpcServerException
*/
private RpcServerException createServerException(Throwable t, String errMsg) {
String formattedErrMsg = String.format(
"[Server]OriginErrorMsg: %s: %s. AdditionalErrorMsg: %s", t.getClass().getName(),
t.getMessage(), errMsg);
RpcServerException e = new RpcServerException(formattedErrMsg);
e.setStackTrace(t.getStackTrace());
return e;
}
}
网友评论