mpush是一个开源的消息推送系统,本文介绍消息编解码器的实现
mpush文档地址:http://mpush.mydoc.io/?t=134818
mpush实现了一个可扩展的私有消息协议。该协议由定长header和body组成,定长header为13个字节。
名称 | 类型 | 长度 | 说明 |
---|---|---|---|
length | int | 4 | 表示body的长度 |
cmd | byte | 1 | 表示消息协议类型 |
checkcode | short | 2 | 是根据body生成的一个校验码 |
flags | byte | 1 | 表示当前包启用的特性,比如是否启用加密,是否启用压缩 |
sessionId | int | 4 | 消息会话标识用于消息响应 |
lrc | byte | 1 | 纵向冗余校验,用于校验header |
编码器

编码器继承自MessageToByteEncoder,MessageToByteEncoder是一种 ChannelOutboundHandler的具体实现。其负责将入站数据从一种协议消息格式成字节流即Outbound ByteBuf。 在netty中使用解码器很简单,就是将入站数据转换格式后传递到 ChannelPipeline 中的下一个ChannelInboundHandler 进行处理,这样的处理是很灵活的,我们可以将解码器放在 ChannelPipeline 中。
public static void encodePacket(Packet packet, ByteBuf out) {
if (packet.cmd == Command.HEARTBEAT.cmd) {
out.writeByte(Packet.HB_PACKET_BYTE);
} else {
//ByteBuf存放字节流,Packet自定义协议的消息对象。
out.writeInt(packet.getBodyLength());
out.writeByte(packet.cmd);
out.writeShort(packet.cc);
out.writeByte(packet.flags);
out.writeInt(packet.sessionId);
out.writeByte(packet.lrc);
if (packet.getBodyLength() > 0) {
out.writeBytes(packet.body);
}
}
packet.body = null;
}
解码器

解码器继承自ByteToMessageDecoder,是一种 ChannelInboundHandler 的具体实现。负责将入站数据从一种字节流转成协议消息格式即packet对象。下图解码器的流程图。

根据协议第一个字节是心跳标志字节,所以先去缓冲区读取一个字节。然后判断缓冲区的可读字节是否大于一个packet头部长度,如果小于说明当前缓冲区内还没有一个完整的packet头部,因此继续等待直到缓冲区有足够的缓冲字节。如果大于则标记当前缓冲区索引,判断缓冲区可读字节字节是否大于packet包体和packet包头之和,如果大于说明缓冲区可读字节足够,根据协议创建packet对象。如果小于说明当前缓冲区可读字节不够,继续等待。
@Override
public final class PacketDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decodeHeartbeat(in, out);
decodeFrames(in, out);
}
private void decodeHeartbeat(ByteBuf in, List<Object> out) {
while (in.isReadable()) {
if (in.readByte() == Packet.HB_PACKET_BYTE) {
out.add(Packet.HB_PACKET);
} else {
in.readerIndex(in.readerIndex() - 1);
break;
}
}
}
private void decodeFrames(ByteBuf in, List<Object> out) {
if (in.readableBytes() >= Packet.HEADER_LEN) {
//1.记录当前读取位置位置.如果读取到非完整的frame,要恢复到该位置,便于下次读取
in.markReaderIndex();
Packet packet = decodeFrame(in);
if (packet != null) {
out.add(packet);
} else {
//2.读取到不完整的frame,恢复到最近一次正常读取的位置,便于下次读取
in.resetReaderIndex();
}
}
}
private Packet decodeFrame(ByteBuf in) {
int readableBytes = in.readableBytes();
int bodyLength = in.readInt();
if (readableBytes < (bodyLength + Packet.HEADER_LEN)) {
return null;
}
if (bodyLength > maxPacketSize) {
throw new TooLongFrameException("packet body length over limit:" + bodyLength);
}
return decodePacket(new Packet(in.readByte()), in, bodyLength);
}
}
网友评论