上篇以Transaction类消息为例浅析了Cat消息发送流程。本篇来了解下Cat客户端和服务端的消息协议约定。
备注:
Cat版本:v3.0.0
github:https://github.com/dianping/cat
对于一串消息流,我们必须能确定消息边界,提取出单条消息的字节流片段,然后对这个片段按照一定的规则进行反序列化来生成相应的消息对象。
在Java中,只要一个类实现了java.io.Serializable接口,那么它就可以被序列化。但是通过公共接口编码的字节会有很多冗余信息来保证不同对象与字节之间的正确编解码,在CAT中,需要传输的只有MessageTree这么一个对象。通过自定义的序列化方案可以节省许多不必要的字节信息,保证网络传输的高效性。
class PlainTextMessageCodec {
@Override
public ByteBuf encode(MessageTree tree) {
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer(4 * 1024);
int count = 0;
int index = buf.writerIndex();
buf.writeInt(0); // place-holder
count += encodeHeader(tree, buf);
if (tree.getMessage() != null) {
count += encodeMessage(tree.getMessage(), buf);
}
buf.setInt(index, count);
return buf;
}
protected int encodeHeader(MessageTree tree, ByteBuf buf) {
BufferHelper helper = m_bufferHelper;
int count = 0;
count += helper.write(buf, VERSION);
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getDomain());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getHostName());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getIpAddress());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getThreadGroupName());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getThreadId());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getThreadName());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getMessageId());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getParentMessageId());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getRootMessageId());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getSessionToken());
count += helper.write(buf, LF);
return count;
}
public int encodeMessage(Message message, ByteBuf buf) {
if (message instanceof Transaction) {
Transaction transaction = (Transaction) message;
List<Message> children = transaction.getChildren();
if (children.isEmpty()) {
return encodeLine(transaction, buf, 'A', Policy.WITH_DURATION);
} else {
int count = 0;
int len = children.size();
count += encodeLine(transaction, buf, 't', Policy.WITHOUT_STATUS);
for (int i = 0; i < len; i++) {
Message child = children.get(i);
if (child != null) {
count += encodeMessage(child, buf);
}
}
count += encodeLine(transaction, buf, 'T', Policy.WITH_DURATION);
return count;
}
} else if (message instanceof Event) {
return encodeLine(message, buf, 'E', Policy.DEFAULT);
} else if (message instanceof Trace) {
return encodeLine(message, buf, 'L', Policy.DEFAULT);
} else if (message instanceof Metric) {
return encodeLine(message, buf, 'M', Policy.DEFAULT);
} else if (message instanceof Heartbeat) {
return encodeLine(message, buf, 'H', Policy.DEFAULT);
} else {
throw new RuntimeException(String.format("Unsupported message type: %s.", message));
}
}
protected int encodeLine(Message message, ByteBuf buf, char type, Policy policy) {
BufferHelper helper = m_bufferHelper;
int count = 0;
count += helper.write(buf, (byte) type);
count += helper.write(buf, TAB);
count += helper.writeRaw(buf, message.getType());
count += helper.write(buf, TAB);
count += helper.writeRaw(buf, message.getName());
count += helper.write(buf, TAB);
count += helper.writeRaw(buf, message.getStatus());
count += helper.write(buf, TAB);
Object data = message.getData();
count += helper.writeRaw(buf, String.valueOf(data));
count += helper.write(buf, TAB);
count += helper.write(buf, LF);
}
}
序列化方案采用了常用的特殊分隔符法和长度前缀法。相比较于protobuff等开源协议,在流量的极致优化和便于理解之间进行了折衷。
被序列化的字节码包含3个部分:
1、 前4个字节包含整组字节串的长度,首先通过buf.writeInt(0)占位,编码完通过buf.setInt(index, count)将字节码长度写入buf头4个字节。
2、编码消息树的头部,依次将tree的version, domain, hostName, ipAdress, treadGroupName, treadId, threadName, MessageId, parentMessageId, rootMessageId, sessionToken写入头部,字段之间以"\t"分隔,并以"\n"结尾。空用null表示。
3、编码消息体,每个消息都是以一个表示消息类型的字符开头。
a."A"表示没有嵌套其他类型消息的事务,
b.有嵌套其他消息的事务,以一个 "t" 开头,然后递归去遍历并编码子消息, 最后以一个"T"结束,
c."E"/"L"/"M"/"H"分别表示Event/Trace/Metric/Heartbeat类型消息;
然后依次记录时间、type、name
然后根据条件依次写入status、duration+us、data
字段之间依然以"\t"分割,以"\n"结尾,空用null表示
网友评论