美文网首页
2021-08-29_Netty基于长度的变长解码器源码学习笔记

2021-08-29_Netty基于长度的变长解码器源码学习笔记

作者: kikop | 来源:发表于2021-08-29 17:13 被阅读0次

    20210829_netty基于长度的变长解码器源码学习笔记2

    1概述

    LengthFieldBasedFrameDecoder基于长度的变长解码器,其核心要点:消息结构体中要有指定消息体或整个消息的长度说明。

    这里如果一个消息没有指定固定长度的话, Netty也给我们准备了一个编码器,即LengthFieldPrepender(消息发送之前预先长度编码),这两个类通常一起使用来解决半包与粘包的问题。

    常见的应用场景:专用的客户端、服务器程序。

    本机主要是结合代码分析LengthFieldBasedFrameDecoder解码时序过程。

    1.1LengthFieldBasedFrameDecoder解码器

    如果消息是通过长度进行区分(LengthFieldPrepender),此解码器可以处理粘包与半包问题。

    1.1.1构造函数

    public LengthFieldBasedFrameDecoder(
                int maxFrameLength,
                int lengthFieldOffset, // 长度偏移位置 
                int lengthFieldLength, // 长度占用的字节长度
                int lengthAdjustment,  // 消息内容读取结束标志调整,决定netty还要读多少个字节,就是一个完整的消息包
        int initialBytesToStrip) {     // 拿到一个完整的数据包之后向业务解码传递之前,应该跳过多少字节(有时想跳过指定的数据长度,或者屏蔽不感兴趣的数据)
            this(
                    maxFrameLength,
                    lengthFieldOffset, lengthFieldLength, lengthAdjustment,
                    initialBytesToStrip, true);
        }
    

    1.2pom依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <artifactId>technicaltools</artifactId>
            <groupId>com.kikop</groupId>
            <version>1.0-SNAPSHOT</version>
            <!--定义依赖的父pom文件-->
            <relativePath>../pom.xml</relativePath>
        </parent>
    
        <packaging>jar</packaging>
        <artifactId>mylengthfielddecoderdemo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <description>mylengthfielddecoderdemo project</description>
    
    
        <properties>
            <junit.version>5.7.0</junit.version>
        </properties>
    
        <dependencies>
    
            <!--1.mytechcommon-->
            <dependency>
                <groupId>com.kikop</groupId>
                <artifactId>mytechcommon</artifactId>
                <version>2.0-SNAPSHOT</version>
            </dependency>
    
            <!--2.junit-->
            <!-- Common test dependencies -->
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-api</artifactId>
                <version>${junit.version}</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-engine</artifactId>
                <version>${junit.version}</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-params</artifactId>
                <version>${junit.version}</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
                <version>${junit.version}</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.13.1</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.hamcrest</groupId>
                <artifactId>hamcrest-library</artifactId>
                <version>1.3</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.assertj</groupId>
                <artifactId>assertj-core</artifactId>
                <version>3.18.0</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.mockito</groupId>
                <artifactId>mockito-core</artifactId>
                <version>2.18.3</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.3</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>io.github.artsok</groupId>
                <artifactId>rerunner-jupiter</artifactId>
                <version>2.1.6</version>
                <scope>test</scope>
            </dependency>
    
    
            <!--3.netty-->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <!--<version>${netty.version}</version>-->
                <version>4.1.6.Final</version>
            </dependency>
    
        </dependencies>
    
        <build>
    
        </build>
    
    </project>
    

    2代码示例

    2.1测试

    package com.kikop;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.TooLongFrameException;
    import org.junit.Assert;
    import org.junit.Test;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettydemo
     * @file Name: LengthFieldBasedFrameDecoderTest
     * @desc 基于长度的变长解码器测试
     * @date 2021/8/20
     * @time 9:30
     * @by IDE: IntelliJ IDEA
     */
    public class LengthFieldBasedFrameDecoderTest {
    
    
         @Test
        public void testDiscardTooLongFrame1()
        {
            // 1.构造ByteBuf
            // 默认大端格式,refCnt=1,长度为:256,readerIndex=0,writeIndex=0
            // 高位:byte[0]:0,低位byte[3]:32
            ByteBuf buf = Unpooled.buffer();
    
            /*
             * | 4 byte|
             * +-------+
             * |  32   |        length = 4 bytes
             * +-------+
             */
            buf.writeInt(32);
    
            /*
             * | 4 bytes|          32bytes       |
             * +--------+---*---+-----------+----+
             * |   32   | 1 | 2 | ...       | 32 |        length = 36 bytes
             * +--------+---+---+-----------+----+
             */
            for (int i = 0; i < 32; i++) {
                buf.writeByte(i);
            }
    
            /*
             * | 4 bytes|          32bytes       | 4 bytes|
             * +--------+---*---+-----------+----+--------+
             * |   32   | 1 | 2 | ......... | 32 |    1   |        length = 40 bytes
             * +--------+---+---+-----------+----+--------+
             */
            buf.writeInt(1);
    
            /*
             * | 4 bytes|          32bytes       | 4 bytes|1 bytes|
             * +--------+---*---+-----------+----+--------+-------+
             * |   32   | 1 | 2 | ......... | 32 |    1   |   a   |        length = 41 bytes
             * +--------+---+---+-----------+----+--------+-------+
             */
            buf.writeByte('a');
            // lengthFieldEndOffset:4
            EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
                    16, 0, 4));
            try {
                // 逻辑:先收到后判断,一下子收到41字节,超过 maxFrameLength,所以抛弃:4+32内容
                // 紧接着还剩:5字节(1个int 和 1个byte)
                // 2.字节数据写入通道
                channel.writeInbound(buf);
                fail();
            } catch (TooLongFrameException e) {
                // expected
                e.printStackTrace();
            }
            assertTrue(channel.finish());
    
            // 3.从通道中读取数据
            ByteBuf b = channel.readInbound();
            assertEquals(5, b.readableBytes());
            assertEquals(1, b.readInt());
            assertEquals('a', b.readByte());
            b.release();
    
            assertNull(channel.readInbound());
            channel.finish();
        }
    
    }
    

    2.2Netty变长解码器源码分析之decode

    解码时,用到了设计模式中的构子函数 ByteToMessageDecoder::decode

    2.2.1LengthFieldBasedFrameDecoder类继承关系

    image-20210829154720769.png
    // 入站处理器 LengthFieldBasedFrameDecoder:ChannelInboundHandler
    public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
        public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
            public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
                public abstract class ChannelHandlerAdapter implements ChannelHandler {
                    public interface ChannelInboundHandler extends ChannelHandler {
                        public interface ChannelHandler {
    

    2.2.2AttributeMap如何破线程安全

    /**
     * Holds {@link Attribute}s which can be accessed via {@link AttributeKey}.
     *
     * Implementations must be Thread-safe.
     */
    public interface AttributeMap {
        /**
         * Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
         * an {@link Attribute} which does not have a value set yet.
         */
        <T> Attribute<T> attr(AttributeKey<T> key);
    
        /**
         * Returns {@code} true if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
         */
        <T> boolean hasAttr(AttributeKey<T> key);
    }
    

    2.2.2.1DefaultAttributeMap

    @SuppressWarnings("unchecked")
    @Override
    public <T> Attribute<T> attr(AttributeKey<T> key) {
            synchronized (head) {
                DefaultAttribute<?> curr = head;
    
    public <T> boolean hasAttr(AttributeKey<T> key) {
        if (key == null) {
            throw new NullPointerException("key");
        }
        AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes;
        if (attributes == null) {
            // no attribute exists
            return false;
        }
    
        int i = index(key);
        DefaultAttribute<?> head = attributes.get(i);
        if (head == null) {
            // No attribute exists which point to the bucket in which the head should be located
            return false;
        }
    
        // We need to synchronize on the head.
        synchronized (head) {
            // Start with head.next as the head itself does not store an attribute.
            DefaultAttribute<?> curr = head.next;
            while (curr != null) {
                if (curr.key == key && !curr.removed) {
                    return true;
                }
                curr = curr.next;
            }
            return false;
        }
    }
    

    2.2.3DefaultChannelPipeline

    /**
     * The default {@link ChannelPipeline} implementation.  It is usually created
     * by a {@link Channel} implementation when the {@link Channel} is created.
     * 通道创建时,链表已经创建完成
     */
    public class DefaultChannelPipeline implements ChannelPipeline {
            final AbstractChannelHandlerContext head;
        final AbstractChannelHandlerContext tail;
        
    

    2.2.3.1HeadContext

    // HeadContext.class extends AbstractChannelHandlerContext
    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
    
        private final Unsafe unsafe;
    
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
    
        @Override
        public ChannelHandler handler() {
            return this;
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            // NOOP
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            // NOOP
        }
    
        @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }
    
        @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }
    
        @Override
        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            unsafe.disconnect(promise);
        }
    
        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            unsafe.close(promise);
        }
    
        @Override
        public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
            unsafe.deregister(promise);
        }
    
        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }
    
        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.fireExceptionCaught(cause);
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelUnregistered();
    
            // Remove all handlers sequentially if channel is closed and unregistered.
            if (!channel.isOpen()) {
                destroy();
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
    
            readIfIsAutoRead();
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
    
            readIfIsAutoRead();
        }
    
        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelWritabilityChanged();
        }
    }
    

    2.2.3.2触发顺序

    2.2.3.2.1DefaultChannelPipeline.class
    // 1.DefaultChannelPipeline.class
    @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    
    2.2.3.2.2AbstractChannelHandlerContext.class
    // 2.AbstractChannelHandlerContext.class
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    
    private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    // 获取入站处理器,DefaultChannelPipeline$HeadContext#0
                    // 并调用channelRead事件,数据:msg
                    
                    // DefaultChannelPipeline$HeadContext#0::channelRead
                    //-->AbstractChannelHandlerContext::channelRead
                    //---->LengthFieldBasedFrameDecoder#0::channelRead
                    //------>又回到该invokeChannelRead函数
                    //-------->最终到这里,形成一个大大的环
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                    
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
    2.2.3.2.3AbstractChannelHandlerContext.class(HeadContext)
    // 3.DefaultChannelPipeline.class
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                // ctx:DefaultChannelPipeline$HeadContext#0
                // final class HeadContext extends AbstractChannelHandlerContext
                ctx.fireChannelRead(msg);
            }
    
    2.2.3.2.4AbstractChannelHandlerContext.class(HeadContext)
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        // findContextInbound:查找下一个入站处理器
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    
    private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }
    
    
    2.2.3.2.5ByteToMessageDecoder
    public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while (in.isReadable()) { 
                // 循环读取消息,知道无消息可读
                    // 第一次读取处理了4+32=36个字节
                    // 第二次读取剩下的5个字节
                    
                    int outSize = out.size();
    
                    if (outSize > 0) {
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
    
                        // Check if this handler was removed before continuing with decoding.
                        // If it was removed, it is not safe to continue to operate on the buffer.
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/4635
                        if (ctx.isRemoved()) {
                            break;
                        }
                        outSize = 0;
                    }
    
                    int oldInputLength = in.readableBytes();
                    // 注意这里 decode被 LengthFieldBasedFrameDecoder 重写
                    decode(ctx, in, out);
    
        /**
         * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
         * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
         * {@link ByteBuf}.
         *
         * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param in            the {@link ByteBuf} from which to read data
         * @param out           the {@link List} to which decoded messages should be added
         * @throws Exception    is thrown if an error accour
         */
        protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    

    2.2.4LengthFieldBasedFrameDecoder.decode解码分析

    // 先回忆一下读取的ByteBuf
    /*
             * | 4 bytes|          32bytes       | 4 bytes|1 bytes|
             * +--------+---*---+-----------+----+--------+-------+
             * |   32   | 1 | 2 | ......... | 32 |    1   |   a   |        length = 41 bytes
             * +--------+---+---+-----------+----+--------+-------+
             */
    
    image-20210829155608890.png

    DefaultChannelPipeline$HeadContext#0:inbound=false,outbound=true

    LengthFieldBasedFrameDecoder#0:inbound=true,outbound=false

    DefaultChannelPipeline$TailContext#0:inbound=true,outbound=false

    /**
         * Create a frame out of the {@link ByteBuf} and return it.
         *
         * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param   in              the {@link ByteBuf} from which to read data
         * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
         *                          be created.
         */
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if (discardingTooLongFrame) {
                long bytesToDiscard = this.bytesToDiscard;
                int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
                in.skipBytes(localBytesToDiscard);
                bytesToDiscard -= localBytesToDiscard;
                this.bytesToDiscard = bytesToDiscard;
    
                failIfNecessary(false);
            }
            // 1.in.readableBytes 可读字节长度:41,肯定要大于读取的长度,这是基本要求
            if (in.readableBytes() < lengthFieldEndOffset) {
                return null; // 什么意思
            }
    
            // 2.开始需要读取的长度Length偏移位置:
            // 第一次预期actualLengthFieldOffset:in.readerIndex():0+0==>0
            // 第二次预期actualLengthFieldOffset:36+0==>36
            int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
            
            // 3.获取字节in中长度headLen值:
            // 第一次lengthFieldLength:=4,frameLength=headLen值为:32
            // 第二次lengthFieldLength:=4,frameLength=headLen值为:1
            long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
    
            if (frameLength < 0) {
                in.skipBytes(lengthFieldEndOffset);
                throw new CorruptedFrameException(
                        "negative pre-adjustment length field: " + frameLength);
            }
            // 4.按照数据结构
            // 第一次预期的完整消息长度(headLen+content)
            // 增加长度纠偏lengthAdjustment+lengthFieldEndOffset:32+0+4=36-->frameLength
            
            // 第二次预期的完整消息长度(headLen+content)
            // 增加长度纠偏lengthAdjustment+lengthFieldEndOffset:1+0+4=5-->frameLength
            frameLength += lengthAdjustment + lengthFieldEndOffset;
    
            if (frameLength < lengthFieldEndOffset) {
                in.skipBytes(lengthFieldEndOffset);
                throw new CorruptedFrameException(
                        "Adjusted frame length (" + frameLength + ") is less " +
                        "than lengthFieldEndOffset: " + lengthFieldEndOffset);
            }
    
            if (frameLength > maxFrameLength) {
                // 4.1.丢弃的长度 36-41=-5
                long discard = frameLength - in.readableBytes();
                tooLongFrameLength = frameLength;
    
                if (discard < 0) {
                    // buffer contains more bytes then the frameLength so we can discard all now
                    
                    // 4.2.直接丢弃前面的字节,跳过完整消息报frameLength个长度
                    // 继续下一轮回的decode解码器
                    // 第一次后,此时buf中in 读写索引变化情况如下:
                    // readIndex变化情况:0-->36
                    // writerIndex:41-->41(缓存字节没变)
                    in.skipBytes((int) frameLength);
                } else {
                    // Enter the discard mode and discard everything received so far.
                    discardingTooLongFrame = true;
                    bytesToDiscard = discard;
                    // 直接丢弃前面的字节
                    in.skipBytes(in.readableBytes());
                }
                failIfNecessary(true);
                // 继续下一次 while ByteToMessageDecode解码
                return null;
            }
    
            // 5.完整的消息包
            // never overflows because it's less than maxFrameLength
            int frameLengthInt = (int) frameLength;
            
            // 第一次 in.readableBytes():41
            // 第二次 in.readableBytes():5
            if (in.readableBytes() < frameLengthInt) {
                return null;
            }
    
            if (initialBytesToStrip > frameLengthInt) {
                in.skipBytes(frameLengthInt);
                throw new CorruptedFrameException(
                        "Adjusted frame length (" + frameLength + ") is less " +
                        "than initialBytesToStrip: " + initialBytesToStrip);
            }
            in.skipBytes(initialBytesToStrip);
    
            // extract frame
            // 6.组装最终ByteBuf:frame
            // in.readerIndex():36
            int readerIndex = in.readerIndex();
            
            int actualFrameLength = frameLengthInt - initialBytesToStrip; // 5-0(部分场景需要)
            
            // in:io.netty.buffer.UnpooledUnsafeHeapByteBuf
            // frame:io.netty.buffer.UnpooledSlicedByteBuf        
      
            // extractFrame具体动作:buffer.retainedSlice(index, length):构造新的 ByteBuf,refCnt:2与in一样,同一个指针引用
            ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
            // 修改in.readerIndex下标
            in.readerIndex(readerIndex + actualFrameLength);
            return frame;
        }
    
    // 获取具体消息体buf中 headLen长度的值
    protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        buf = buf.order(order);
        long frameLength;
        switch (length) {
        case 1:
            frameLength = buf.getUnsignedByte(offset);
            break;
        case 2:
            frameLength = buf.getUnsignedShort(offset);
            break;
        case 3:
            frameLength = buf.getUnsignedMedium(offset);
            break;
        case 4:
            frameLength = buf.getUnsignedInt(offset);
            break;
        case 8:
            frameLength = buf.getLong(offset);
            break;
        default:
            throw new DecoderException(
                    "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
        }
        return frameLength;
    }
    

    3总结

    3.1项目应用

    3.1.1自定义消息对应的变长参数

    @MessageDecodeAnnotattion(
            maxFrameLength = 1024,
            lengthFieldOffset = 5, // 跳过前面5个字节就到长度字段
            lengthFieldLength = 1, // 消息内容数据长度
            lengthAdjustment = 0,  // 完整包的进行长度微调
            initialBytesToStrip = 0, // 数据全要 ok
            failFast = true, readerIdleTimeSeconds = 30, writerIdleTimeSeconds = 0, allIdleTimeSeconds = 0)
    

    3.1.2自定义消息结构

    @OffsetAnnoattion(position = 0, datType = EnumDataType.ByteArray, DataLength = 2)
    private byte[] head;        // 数组,2长度。头部标识 0xAAAA
    
    @OffsetAnnoattion(position = 2, datType = EnumDataType.Byte)
    private byte addr;          // 地址,1表示设备收
    
    @OffsetAnnoattion(position = 3, datType = EnumDataType.Byte)
    private byte sys;           // 系统号,0表示系统1或通道1或射频1,1表示系统2或通道2或射频2
    
    @OffsetAnnoattion(position = 4, datType = EnumDataType.Byte)
    private byte type;          // 消息类型
    
    @OffsetAnnoattion(position = 5, datType = EnumDataType.Byte)
    private byte data_length;   // 消息数据长度,紧随其后的是消息ID和消息数据长度的总和,如果这个值是:00x10且lengthAdjustment = 0,表示还要读16个字节,netty才能任务是一个完整的包
    
    @OffsetAnnoattion(position = 6, datType = EnumDataType.Short)
    private short messageid;    // 消息ID [是上位机每发一个消息的递增序号,可以总是填充全0]
    
    @OffsetAnnoattion(position = 8, datType = EnumDataType.ByteArray, DataLength = -1)
    private byte[] content_data; // 消息数据[254]
    

    3.1.3获取指定属性名称的字节内容

    涉及知识点:.capacity()、.position(XXX)、get(dest,0,length)

    /**
     * 获取指定属性名称的字节数组
     @param propertyName 属性名称,如content_data(数组类型)
     * @return 字节数据
     */
    protected byte[] GetBytes(String propertyName) {
        
        // 1.获取OffsetAnnoattion字段注解参数
        PropertyInfo info = ParseOffsetAnnoattion(propertyName);
        OffsetAnnoattion annoation = info.getOffset();
        int dataLength = annoation.DataLength();
        // 2.如果长度没有指定,则直接当前字节流去掉前面的已知长度
        int position = annoation.position();
        if (dataLength < 0) {
            // java.nio.ByteBuffer.m_buffer:完整消息包
            dataLength = this.m_buffer.capacity() - position;
        }
        // 3.构建指定长度的数组
        byte[] dest = new byte[dataLength];
        // 4.数组校验
        if (annoation.datType() != EnumDataType.ByteArray) {
            return null;
        }
        // 5.填充dest字节数组
        ((ByteBuffer) m_buffer.position(position)).get(dest, 0, dataLength);
        // 6.返回dest
        return dest;
    }
    

    参考

    1.1【看完就会】Netty的LengthFieldBasedFrameDecoder的用法详解

    https://blog.csdn.net/zougen/article/details/79037675

    1.2【Netty】decoder相关(四):长度域解码器LengthFieldBasedFrameDecoder

    https://blog.csdn.net/qq_33347239/article/details/104337384

    1.3netty 中LengthFieldPrepender与LengthFieldBasedFrameDecoder

    https://blog.csdn.net/cgj296645438/article/details/90667419

    1.4Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题

    https://www.jianshu.com/p/2f4b9ecf2fdf

    1.5Netty-源码分析ByteBuf-readSlice和readRetainedSlice使用细节

    https://blog.csdn.net/nimasike/article/details/103462546

    相关文章

      网友评论

          本文标题:2021-08-29_Netty基于长度的变长解码器源码学习笔记

          本文链接:https://www.haomeiwen.com/subject/wafgiltx.html