美文网首页
mpush消息编解码器代码解读

mpush消息编解码器代码解读

作者: 王剑_a9e1 | 来源:发表于2018-07-10 01:22 被阅读0次

    mpush是一个开源的消息推送系统,本文介绍消息编解码器的实现
    mpush文档地址:http://mpush.mydoc.io/?t=134818
    mpush实现了一个可扩展的私有消息协议。该协议由定长header和body组成,定长header为13个字节。

    名称 类型 长度 说明
    length int 4 表示body的长度
    cmd byte 1 表示消息协议类型
    checkcode short 2 是根据body生成的一个校验码
    flags byte 1 表示当前包启用的特性,比如是否启用加密,是否启用压缩
    sessionId int 4 消息会话标识用于消息响应
    lrc byte 1 纵向冗余校验,用于校验header

    编码器

    编码器继承自MessageToByteEncoder,MessageToByteEncoder是一种 ChannelOutboundHandler的具体实现。其负责将入站数据从一种协议消息格式成字节流即Outbound ByteBuf。 在netty中使用解码器很简单,就是将入站数据转换格式后传递到 ChannelPipeline 中的下一个ChannelInboundHandler 进行处理,这样的处理是很灵活的,我们可以将解码器放在 ChannelPipeline 中。

    public static void encodePacket(Packet packet, ByteBuf out) {
        if (packet.cmd == Command.HEARTBEAT.cmd) {
            out.writeByte(Packet.HB_PACKET_BYTE);
        } else {
            //ByteBuf存放字节流,Packet自定义协议的消息对象。
            out.writeInt(packet.getBodyLength());
            out.writeByte(packet.cmd);
            out.writeShort(packet.cc);
            out.writeByte(packet.flags);
            out.writeInt(packet.sessionId);
            out.writeByte(packet.lrc);
            if (packet.getBodyLength() > 0) {
                out.writeBytes(packet.body);
            }
        }
        packet.body = null;
    }
    

    解码器


    解码器继承自ByteToMessageDecoder,是一种 ChannelInboundHandler 的具体实现。负责将入站数据从一种字节流转成协议消息格式即packet对象。下图解码器的流程图。



    根据协议第一个字节是心跳标志字节,所以先去缓冲区读取一个字节。然后判断缓冲区的可读字节是否大于一个packet头部长度,如果小于说明当前缓冲区内还没有一个完整的packet头部,因此继续等待直到缓冲区有足够的缓冲字节。如果大于则标记当前缓冲区索引,判断缓冲区可读字节字节是否大于packet包体和packet包头之和,如果大于说明缓冲区可读字节足够,根据协议创建packet对象。如果小于说明当前缓冲区可读字节不够,继续等待。

    @Override
    public final class PacketDecoder extends ByteToMessageDecoder {
    
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            decodeHeartbeat(in, out);
            decodeFrames(in, out);
        }
    
        private void decodeHeartbeat(ByteBuf in, List<Object> out) {
            while (in.isReadable()) {
                if (in.readByte() == Packet.HB_PACKET_BYTE) {
                    out.add(Packet.HB_PACKET);
                } else {
                    in.readerIndex(in.readerIndex() - 1);
                    break;
                }
            }
        }
    
        private void decodeFrames(ByteBuf in, List<Object> out) {
            if (in.readableBytes() >= Packet.HEADER_LEN) {
                //1.记录当前读取位置位置.如果读取到非完整的frame,要恢复到该位置,便于下次读取
                in.markReaderIndex();
    
                Packet packet = decodeFrame(in);
                if (packet != null) {
                    out.add(packet);
                } else {
                    //2.读取到不完整的frame,恢复到最近一次正常读取的位置,便于下次读取
                    in.resetReaderIndex();
                }
            }
        }
    
        private Packet decodeFrame(ByteBuf in) {
            int readableBytes = in.readableBytes();
            int bodyLength = in.readInt();
            if (readableBytes < (bodyLength + Packet.HEADER_LEN)) {
                return null;
            }
            if (bodyLength > maxPacketSize) {
                throw new TooLongFrameException("packet body length over limit:" + bodyLength);
            }
            return decodePacket(new Packet(in.readByte()), in, bodyLength);
        }
    }
    

    相关文章

      网友评论

          本文标题:mpush消息编解码器代码解读

          本文链接:https://www.haomeiwen.com/subject/crmxpftx.html