美文网首页
Netty学习整理

Netty学习整理

作者: 愤怒的老照 | 来源:发表于2021-04-07 21:00 被阅读0次

    1 复习NIO

    https://www.jianshu.com/p/b36b4e8c1343

    2 Netty整体架构

    image.png

    2.1 网络通信层

    网络通信层的职责是执行网络 I/O 的操作。它支持多种网络协议和 I/O 模型的连接操作。当网络数据读取到内核缓冲区后,会触发各种网络事件,这些网络事件会分发给事件调度层进行处理。

    网络通信层的核心组件包含BootStrap&ServerBootStrap、Channel三个组件。

    • BootStrap(客户端) & ServerBootStrap(服务端):负责整个 Netty 程序的启动、初始化、服务器连接等过程,它相当于一条主线,串联了 Netty 的其他核心组件。
    • Channel:Channel提供了基本的 API 用于网络 I/O 操作,提供了与底层 Socket 交互的能力,如 register、bind、connect、read、write、flush 等。当然 Channel 会有多种状态,如连接建立、连接注册、数据读写、连接销毁等。随着状态的变化,Channel 处于不同的生命周期,每一种状态都会绑定相应的事件回调:


      image.png

    2.2 事件调度层

    事件调度层的职责是通过 Reactor 线程模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等),实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。事件调度层的核心组件包括 EventLoopGroup、EventLoop。

    • EventLoopGroup :本质是一个线程池,主要负责接收 I/O 请求,并分配线程执行处理请求
    • EventLoop:同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。

    EventLoopGroup与线程模型的对应?
    Netty 通过创建不同的 EventLoopGroup 参数配置,就可以支持 Reactor 的三种线程模型:

    • 单线程模型:EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;

    • 多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;

    • 主从多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor,它们分别使用不同的 EventLoopGroup,主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。

    2.3 服务编排层

    服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

    服务编排层的核心组件包括 ChannelPipeline、ChannelHandler、ChannelHandlerContext。

    • ChannelPipeline:负责组装各种 ChannelHandler,当 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。
    • ChannelHandler & ChannelHandlerContext:ChannelHandlerContext 用于保存 ChannelHandler 上下文,通过 ChannelHandlerContext 我们可以知道 ChannelPipeline 和 ChannelHandler 的关联关系

    3 服务启动流程

    public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                SelfSignedCertificate ssc = new SelfSignedCertificate();
                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc()));
                         }
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(serverHandler);
                     }
                 });
    
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
    • 配置线程池:当前配置采用主从Reactor模式。Boss 是主 Reactor,Worker 是从 Reactor。它们分别使用不同的 NioEventLoopGroup,主 Reactor 负责处理 Accept,然后把 Channel 注册到从 Reactor 上,从 Reactor 主要负责 Channel 生命周期内的所有 I/O 事件。

    如果是使用group(EventLoopGroup group)方法,最终调用逻辑如下:

    /**
         * Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
         */
        @Override
        public ServerBootstrap group(EventLoopGroup group) {
            return group(group, group);
        }
    

    可以看出Netty 线程模型的可定制化程度很高,它只需要简单配置不同的参数,便可启用不同的 Reactor 线程模型
    在注册channel时,group通过chooser(有GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser两种)选取eventLoop,将channel和eventLoop绑定,之后 Channel 生命周期内的所有 I/O 事件都由这个 EventLoop 负责处理

    ServerBootstrap 中 bind() 实现:

    • 创建channel
     final ChannelFuture regFuture = initAndRegister();
    
    // initAndRegister实现
    // 1、通过反射创建服务端Channel
    channel = channelFactory.newChannel();
    
    // 通过工厂类ReflectiveChannelFactory反射创建channel,类型是最开始设置的(NioServerSocketChannel)
    public NioServerSocketChannel() {
        // 创建服务端ServerSocketChannel
        this(newSocket(DEFAULT_SELECTOR_PROVIDER)); 
    }
    
    // 在创建Channel时,注册感兴趣的事件(OP_ACCEPT)
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT); // 调用父类方法
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    
    // Channel父类构造函数
    // 1、调用 AbstractChannel 的构造函数创建了三个重要的成员变量,分别为 id、unsafe、pipeline。id 表示全局唯一的 Channel,unsafe 用于操作底层数据的读写操作,pipeline 负责业务处理器的编排。
    // 2、可以看到每创建一个channel,都有与之对应的pipeline,pipeline最初包含headContext和TailContext两个节点
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId(); // Channel 全局唯一 id 
        unsafe = newUnsafe(); // unsafe 操作底层读写
        pipeline = newChannelPipeline(); // pipeline 负责业务处理器编排
    }
    
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            // 创建 JDK 底层的 ServerSocketChannel
            return provider.openServerSocketChannel(); 
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    
    
    • 初始化channel
     final ChannelFuture regFuture = initAndRegister();
    
     init(channel);
    
    // 1、添加特殊的 Handler 处理器ChannelInitializer,并引入另一个特殊的Handler处理器ServerBootstrapAcceptor
    // ChannelInitializer:提供了一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。
    
    //  ServerBootstrapAcceptor:成功构造客户端 NioSocketChannel 后, pipeline.fireChannelRead() 触发 channelRead 事件传播。会传播到 ServerBootstrapAcceptor.channelRead() 方法,channelRead() 会将客户端 Channel 分配到工作线程组中去执行。
    
    p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
    
    • 注册channel
    ChannelFuture regFuture = config().group().register(channel);
    
    // 我们初始化的group通常是MultithreadEventLoopGroup类型的,当注册时会使用选择器选择一个eventLoop进行注册
    public ChannelFuture register(Channel channel) {
        return next().register(channel); // 选择一个 eventLoop 注册
    }
    
    // 调用 JDK 底层的 register() 进行注册
    doRegister(); 
    
    //调用 JDK 底层的 register() 进行注册。register() 的第三个入参传入的是 Netty 自己实现的 Channel 对象,调用 register() 方法会将它绑定在 JDK 底层 Channel 的 attachment 上。这样在每次 Selector 对象进行事件循环时,Netty 都可以从返回的 JDK 底层 Channel 中获得自己的 Channel 对象。
    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 
    
    • 注册完成后的事件触发(涉及pipeline如何和自定义的Handler关联)
    // 1、用户自定义的业务处理器添加到 Pipeline 中
    pipeline.invokeHandlerAddedIfNeeded();
    
    // 通过invokeHandlerAddedIfNeeded调用Handler的handlerAdded实现,此处和之前创建的ChannelInitializer关联起来了
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
           if (initMap.add(ctx)) {
            try {
           //  调用ChannelInitializer实现类的具体实现
           // 1、在服务端Handler:对应的是添加ServerBootstrapAcceptor这个特殊的Handler,并且在处理完之后会删除ChannelInitializer,此时Handler责任链是HeadContext<->LogHandler<->ServerBootstrapAcceptor<->TailContext
           // 2、在客户端Handler:对应的就是EchoServer添加的自定义Handler
                initChannel((C) ctx.channel()); 
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    // 处理完之后会删除ChannelInitializer
                    pipeline.remove(this); 
                }
            }
            return true;
        }
        return false;
        }
    
    // 2、触发channelRegistered事件
    pipeline.fireChannelRegistered(); 
    
    • 端口绑定
    // ...省略其他代码
    javaChannel().bind(localAddress, config.getBacklog());
    
    // 完成端口绑定之后,Channel 处于活跃 Active 状态,然后会调用 pipeline.fireChannelActive() 方法触发 channelActive 事件
    pipeline.fireChannelActive(); // 触发 channelActive 事件
    

    4 处理请求流程

    NioEventLoop核心处理逻辑

    protected void run() {
            // 1、入口死循环,不断检测IO事件并处理任务
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
    
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    
    • 轮询 I/O 事件
    selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
    
    // Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。
    hasTasks()
    
        @Override
        protected boolean hasTasks() {
            return super.hasTasks() || !tailTasks.isEmpty();
        }
    
    // 选择策略返回值
    // 如果包含普通任务和尾部任务,则调用selectNowSupplier的值;当 NioEventLoop 线程的不存在异步任务,即任务队列为空,返回的是 SELECT 策略
        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
    
    // 如果当前 NioEventLoop 线程存在异步任务,会通过 selectSupplier.get() 最终调用到 selectNow() 方法,selectNow() 是非阻塞,执行后立即返回。
        private final IntSupplier selectNowSupplier = new IntSupplier() {
            @Override
            public int get() throws Exception {
                return selectNow();
            }
        };
    
        int selectNow() throws IOException {
            try {
                return selector.selectNow();
            } finally {
                // restore wakeup state if needed
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }
        }
    
    
    // 策略分两种情况
    // 1、调用selectNow非阻塞查询,有就绪的channel,则直接跳到break执行下面的逻辑
    // 2、调用selectNow无就绪结果,返回SELECT策略,调用select(wakenUp.getAndSet(false));
    
    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.BUSY_WAIT:
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                    }
    
    
    • 调用select阻塞方法
    select(boolean oldWakenUp)
    
    // 1、计算 select 阻塞操作的最后截止时间,delayNanos为最近的一个定时任务,如果没有默认是一分钟
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 
    
    // 2、检测 select 阻塞操作是否超过截止时间,timeoutMillis<0说明有定时任务要执行,需要立即退出;但是预留了0.5ms的窗口时间,为了方便舍去小数
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    
    // 3、轮询过程中及时处理产生的任务
    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
           selector.selectNow();
           selectCnt = 1;//将已轮训次数重置为1
           break;
     }
    
    // 4、将selector阻塞timeoutMillis毫秒
    // 这里如果定时任务时间过长,肯定不能等待很长时间
    // 在任务添加的时候,会调用wakeup方法唤醒线程,避免等待时间过长
    int selectedKeys = selector.select(timeoutMillis);
    
    // 5、满足一下任意条件,则推吹循环
    /**
                * selectedKeys != 0 表示轮询到有已经就绪的IO事件
                * wakenUp.get() 表示是否被用户唤醒
                * hasTasks() 表示普通任务队列中有未完成的任务
                * hasScheduledTasks() 表示定时任务队列中有未完成的任务
                * 上述条件任何一个条件为真,则退出select,准备处理对应任务
                */
    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
    
    // 6、执行时间 >= 超时时间,说明selector执行正常,轮询次数置为1
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    selectCnt = 1;
                } 
    
    // 7、执行时间 <超时时间,并且轮询次数达到阈值,说明发生空轮询,重新构建selector并关联channel
    else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);
    
                    rebuildSelector();
                    selector = this.selector;
    
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
    
    • 处理IO事件
    // 处理IO事件有两种方式,只看一下processSelectedKeysPlain
    // 1、processSelectedKeysOptimized,处理Netty 优化过的 selectedKeys
    // 2、processSelectedKeysPlain,正常的处理逻辑
    
    // 1、处理连接事件。表示 TCP 连接建立成功, Channel 处于 Active 状态
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
    // 2、处理可写事件。表示上层可以向 Channel 写入数据,通过执行 ch.unsafe().forceFlush() 操作,将数据冲刷到客户端,最终会调用 javaChannel 的 write() 方法执行底层写操作
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
    // 3、处理可读事件。可读事件。表示 Channel 收到了可以被读取的新数据。依次调用 ChannelHandler 的 channelRead() 方法处理数据
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
    
    • 任务执行
    // 1、将定时任务和普通任务合并
    fetchedAll = fetchFromScheduledTaskQueue();
    // 2、循环执行任务,就是直接调用的 Runnable 的 run() 方法。
    safeExecute(task); // 执行任务
    // 3、收尾工作,执行尾部队列任务,并不常用,一般用于统计
        protected void afterRunningAllTasks() {
            runAllTasksFrom(tailTasks);
        }
    
    • 数据在 Pipeline 中的运转
      1、数据读
    // 1、首先通过EventLoop处理读事件,读取完成后触发channelRead
    pipeline.fireChannelRead(byteBuf);
    
    // 2、Pipeline的fireChannelRead调用了公用的invokeChannelRead,因为要从Head触发,所以此时传入的next为Head
    @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    
    
    // 3、调用channelContext的invokeChannelRead
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
    
    // 4、调用context绑定handler的channelRead,处理具体逻辑
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
    // 5、如果没有实现channelRead方法,则选取下一个context,递归执行第三步
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
    

    2、数据写

    // 1、调用writeAndFlush写数据,无论是从哪里开始写,都会调用Tail的writeAndFlush
        @Override
        public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            return tail.writeAndFlush(msg, promise);
        }
    
    // 2、与读数据相反,写数据查找下一个节点是通过查找prev指针
        private AbstractChannelHandlerContext findContextOutbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;
            } while (!ctx.outbound);
            return ctx;
        }
    
    // 3、递归调用到HeadContext,通过unsafe写入socket
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                unsafe.write(msg, promise);
            }
    

    3、异常处理

    // 1 、发生异常后,调用fireExceptionCaught
    pipeline.fireExceptionCaught(e);
    
    // 2、从HeadContext开始传递异常信息
        @Override
        public final ChannelPipeline fireExceptionCaught(Throwable cause) {
            AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
            return this;
        }
    
    // 3、同样的递归调用到TailContext的exceptionCaught,作为兜底逻辑,打印日志并释放资源
    protected void onUnhandledInboundException(Throwable cause) {
            try {
                logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                                "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
            } finally {
                ReferenceCountUtil.release(cause);
            }
        }
    
    // 4、由于异常信息是从HeadContext开始传递,所以最佳实践是将自定义的异常处理器放在TailContext前、其他自定义Handler后。可以保证如论是哪里发生了异常,自定义异常处理器都可以处理到
    

    Netty处理粘包拆包

    TCP 传输协议是面向流的,没有数据包界限。可能受MTU 传输单元大小、MSS 最大分段大小、滑动窗口等因素影响,将一个完整的报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大的报文进行发送。因此就有了拆包和粘包。

    • ByteToMessageDecoder抽象类
    // 1、ByteToMessageDecoder继承ChannelInboundHandlerAdapter,可以看出ByteToMessageDecoder解码器应该放在HeadContext后、其他Handler前的位置
    
    // 2、当channel可读时,触发pipeline的channelRead实现,对应到ByteToMessageDecoder 的实现如下:
                   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //......省略其他内容
                      CodecOutputList out = CodecOutputList.newInstance();
                try {
                    // channelRead的msg是ByteBuf类型
                    ByteBuf data = (ByteBuf) msg;
                    first = cumulation == null;
                   // 如果是第一次,则初始化cumulation
                    if (first) {
                        cumulation = data;
                    } else {
                        // 不是第一次,追加到cumulation
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                    }
    
                    // 对数据进行处理
                    callDecode(ctx, cumulation, out);
    
    //......省略其他内容
        }
    
        protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while(true) {
                    if (in.isReadable()) {
                        // out中存放的是完整的对象集合,如果不为空,就可以接着触发channelRead
                        int outSize = out.size();
                        if (outSize > 0) {
                            // 循环推进out集合的内容
                            fireChannelRead(ctx, out, outSize);
                            out.clear();
                            if (ctx.isRemoved()) {
                                return;
                            }
    
                            outSize = 0;
                        }
    
                        int oldInputLength = in.readableBytes();
      
                        // 解码的核心逻辑
                        this.decodeRemovalReentryProtection(ctx, in, out);
                     //......省略
                    }
    
                    return;
                }
            } catch (DecoderException var6) {
                throw var6;
            } catch (Exception var7) {
                throw new DecoderException(var7);
            }
        }
    
    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                throws Exception {
            // 设置当前状态为正在解码
            decodeState = STATE_CALLING_CHILD_DECODE;
            try {
                // 解码
                decode(ctx, in, out);
            } finally {
                // 执行hander的remove操作
                boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
                decodeState = STATE_INIT;
                if (removePending) {
                    handlerRemoved(ctx);
                }
            }
        }
    
        // 子类都重写了该方法,每种实现都会有自己特殊的解码方式
        // 有FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder
        protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    
    
    • 最简单的解决方案:FixedLengthFrameDecoder
    @Override
        protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            Object decoded = decode(ctx, in);
            if (decoded != null) {
               // 如果有符合要求的对象,添加到集合中,由父类继续推动channelRead,此时的对象是一个完整的对象
                out.add(decoded);
            }
        }
    
        protected Object decode(
                @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            // 收集到的数据是否小于固定长度,小于就代表无法解析,直接返回空;并等待ByteBuf中的数据,使能够达到固定长度
            
            if (in.readableBytes() < frameLength) {
                return null;
            } else {
                return in.readRetainedSlice(frameLength);
            }
        }
    
    

    相关文章

      网友评论

          本文标题:Netty学习整理

          本文链接:https://www.haomeiwen.com/subject/wmzukltx.html