美文网首页
Netty框架

Netty框架

作者: 依弗布德甘 | 来源:发表于2020-02-01 21:14 被阅读0次

    Netty是什么

    Netty是一个高性能、高可扩展的异步事件驱动的网络应用程序框架,它极大的简化了TCP和UDP客户端和服务器开发等网络编程

    Netty重要的四个内容

    1. Reactor线程模型:一种高性能的多线程程序设计思路
    2. Netty中自定义的Channel概念:增强版的通道概念
    3. ChannelPipeline职责链设计模式:事件处理机制
    4. 内存管理:增强的ByteBuf缓存区

    Netty整体结构

    netty结构图

    图片来自Netty官网 https://netty.io/

    • 支撑Socket等多种传输方式
    • 提供了多种协议的编码实现
    • 核心设计包含事件处理模型、API的使用、ByteBuffer的增强

    Netty EchoServer 代码示例

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.util.concurrent.GenericFutureListener;
    
    /**
     * Echoes back any received data from a client.
     */
    public final class EchoServer {
        static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
    
        public static void main(String[] args) throws Exception {
            // Configure the server.
            // 创建EventLoopGroup   accept线程组 NioEventLoop
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // 创建EventLoopGroup   I/O线程组
            EventLoopGroup workerGroup2 = new NioEventLoopGroup(1);
            try {
                // 服务端启动引导工具类
                ServerBootstrap b = new ServerBootstrap();
                // 配置服务端处理的reactor线程组以及服务端的其他配置
                b.group(bossGroup, workerGroup2).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new EchoServerHandler());
                    }
                });
                // 通过bind启动服务
                ChannelFuture f = b.bind(PORT).sync();
                // 阻塞主线程,知道网络服务被关闭
                f.channel().closeFuture().sync();
            } finally {
                // 关闭线程组
                bossGroup.shutdownGracefully();
                workerGroup2.shutdownGracefully();
            }
        }
    }
    
    • bind():绑定端口
    • group() : 设定accept,客户端连接,所有I/O事件由谁处理
    • channel():创建具体的通道
    • handler():处理服务端通道请求
    • option():通道相关配置
    • childHandler():处理具体客户端socket连接后的请求

    Netty线程模型

    为了让NIO处理更好的利用多线程特性,Netty实现了Reactor线程模型;Reactor模型中有四个核心概念:

    1. Resources资源(请求/任务)
    2. Synchronous Event Demultiplexer 同步事件复用器
    3. Dispatcher分配器
    4. Request Handler 请求处理器

    EventLoopGroup 事件轮询器

    EventLoopGroup EventLoop

    Channel概念

    netty中的Channel是一个抽象的概念,可以理解为对JDK NIO Channel的增强和拓展;

    AbstractChannel常见的属性和方法

    • Pipeline 通道内事件处理链路
    • EventLoop 绑定的EventLoop,用于执行操作
    • Unsafe 提供 I/O相关的封装操作
    • config() 返回通道配置信息
    • read() 开始读数据,触发读取链路调用
    • write() 写数据,触发链路调用
    • bind() 绑定

    设计模式-责任链模式

    发起请求和具体处理请求的过程解偶:职责链上的处理者负责处理请求,客户只需将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递

    责任链模式
    实现责任链模式4个要素
    • 处理器抽象类
    • 具体的处理器实现类
    • 保存处理器信息
    • 处理执行
    
    // -----链表形式调用------netty就是类似的这种形式
    public class PipelineDemo {
        /**
         * 初始化的时候造一个head,作为责任链的开始,但是并没有具体的处理
         */
        public HandlerChainContext head = new HandlerChainContext(new AbstractHandler() {
            @Override
            void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
                handlerChainContext.runNext(arg0);
            }
        });
    
        public void requestProcess(Object arg0) {
            this.head.handler(arg0);
        }
    
        public void addLast(AbstractHandler handler) {
            HandlerChainContext context = head;
            while (context.next != null) {
                context = context.next;
            }
            context.next = new HandlerChainContext(handler);
        }
    
    
        public static void main(String[] args) {
            PipelineDemo pipelineChainDemo = new PipelineDemo();
            pipelineChainDemo.addLast(new Handler2());
            pipelineChainDemo.addLast(new Handler1());
            pipelineChainDemo.addLast(new Handler1());
            pipelineChainDemo.addLast(new Handler2());
    
            // 发起请求
            pipelineChainDemo.requestProcess("火车呜呜呜~~");
    
        }
    }
    
    /**
     * handler上下文,我主要负责维护链,和链的执行
     */
    class HandlerChainContext {
        HandlerChainContext next; // 下一个节点
        AbstractHandler handler;
    
        public HandlerChainContext(AbstractHandler handler) {
            this.handler = handler;
        }
    
        void handler(Object arg0) {
            this.handler.doHandler(this, arg0);
        }
    
        /**
         * 继续执行下一个
         */
        void runNext(Object arg0) {
            if (this.next != null) {
                this.next.handler(arg0);
            }
        }
    }
    
    // 处理器抽象类
    abstract class AbstractHandler {
        /**
         * 处理器,这个处理器就做一件事情,在传入的字符串中增加一个尾巴..
         */
        abstract void doHandler(HandlerChainContext handlerChainContext, Object arg0); // handler方法
    }
    
    // 处理器具体实现类
    class Handler1 extends AbstractHandler {
        @Override
        void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
            arg0 = arg0.toString() + "..handler1的小尾巴.....";
            System.out.println("我是Handler1的实例,我在处理:" + arg0);
            // 继续执行下一个
            handlerChainContext.runNext(arg0);
        }
    }
    
    // 处理器具体实现类
    class Handler2 extends AbstractHandler {
        @Override
        void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
            arg0 = arg0.toString() + "..handler2的小尾巴.....";
            System.out.println("我是Handler2的实例,我在处理:" + arg0);
            // 继续执行下一个
            handlerChainContext.runNext(arg0);
        }
    }
    

    Netty中的ChannelPipeline责任链

    ChannelPipeline

    入站事件和出站事件

    入站事件:通常指I/O线程成了入站数据

    (通俗理解:从socket底层自己往上冒上来的事件都是入站)
    比如EventLoop收selector的OP_READ事件,入站处理器调用socketChannel.read(ByteBuffer)接收到数据后,这将导致通道的ChannelPipeline中包含的下一个中的channelRead方法被调用

    出站事件:经常指I/O线程执行实际的输出操作

    (通俗理解:想主动往socket底层操作的事件都是出站)
    比如bind方法用意是请求server socket绑定到给定的SocketAddress,这将导致通道的ChannelPipeline中包含的下一个出站处理器中的bind方法被调用

    Netty中的事件定义
    Netty中的事件定义

    Pipeline中的handler是什么

    ChannelHandler
    用于处理I/O事件或拦截I/O操作,并转发到ChannelPipeline中的下一个处理器
    这个顶级接口定义功能很弱,实际使用时会去实现下面两大子接口:

    • 处理入站I/O事件的ChannelInboundHandler
    • 处理出站I/O操作的ChannelOutboundHandler

    适配器类
    为了开发方便,避免所有handler去实现一遍接口方法,Netty提供了简单实现类

    • ChannelInboundHandlerAdapter处理入站I/O事件
    • ChannelOutbundHandlerAdapter 处理出站I/O操作
    • ChannelDuplexHandler 支持同时处理入站和出站事件

    ChannelHandlerContext
    实际存储在Pipeline中的对象并非ChannelHandler,而是上下文对象
    将handler,包裹在上下文对象中,通过上下文对象与它所属的ChannelPipeline交互,向上或者向下传递事件,或者修改pipeline都是通过上下文对象

    ChannelPipeline是线程安全的,ChannelHandler可以在任何时候添加或删除

    handler api
    Handler示例
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.nio.charset.Charset;
    import java.util.Arrays;
    
    /**
     * Handler implementation for the echo server.
     */
    @Sharable
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("收到数据:" + ((ByteBuf)msg).toString(Charset.defaultCharset()));
            ctx.write(Unpooled.wrappedBuffer("98877".getBytes()));
            // ((ByteBuf) msg).release();
            ctx.fireChannelRead(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    Handler的执行分析
    Handler执行顺序.png
    registered入站事件的处理
    registered入站事件的处理
    bind出站事件的处理
    bind出站事件处理

    pipeline分析的关键4要素

    1. 什么事件
    2. 有哪些处理器
    3. 哪些会被触发
    4. 执行顺序

    Netty 中的ByteBuf

    JDK ByteBuffer的缺点:无法动态扩容,API使用复杂

    ByteBuf增强
    • API操作便捷性
    • 动态扩容
    • 多种ByteBuf实现
    • 高效的零拷贝机制
    ByteBuf 三个重要的属性
    • capacity 容量
    • readerIndex 读取位置
    • writerIndex 写入位置

    相比JDK中的一个指针,Netty中的ByteBuf提供了两个指针来支出顺序读和写操作

    示例
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import org.junit.Test;
    
    import java.util.Arrays;
    
    /**
     * bytebuf的常规API操作示例
     */
    public class ByteBufDemo {
        @Test
        public void apiTest() {
            //  +-------------------+------------------+------------------+
            //  | discardable bytes |  readable bytes  |  writable bytes  |
            //  |                   |     (CONTENT)    |                  |
            //  +-------------------+------------------+------------------+
            //  |                   |                  |                  |
            //  0      <=       readerIndex   <=   writerIndex    <=    capacity
    
            // 1.创建一个非池化的ByteBuf,大小为10个字节
            ByteBuf buf = Unpooled.buffer(10);
            System.out.println("原始ByteBuf为====================>" + buf.toString());
            System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 2.写入一段内容
            byte[] bytes = {1, 2, 3, 4, 5};
            buf.writeBytes(bytes);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 3.读取一段内容
            byte b1 = buf.readByte();
            byte b2 = buf.readByte();
            System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
            System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 4.将读取的内容丢弃
            buf.discardReadBytes();
            System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
            System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 5.清空读写指针
            buf.clear();
            System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
            System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 6.再次写入一段内容,比第一段内容少
            byte[] bytes2 = {1, 2, 3};
            buf.writeBytes(bytes2);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
    
            // 7.将ByteBuf清零
            buf.setZero(0, buf.capacity());
            System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
            System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
    
            // 8.再次写入一段超过容量的内容
            byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
            buf.writeBytes(bytes3);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
            //  随机访问索引 getByte
            //  顺序读 read*
            //  顺序写 write*
            //  清除已读内容 discardReadBytes
            //  清除缓冲区 clear
            //  搜索操作
            //  标记和重置
            //  完整代码示例:参考
            // 搜索操作 读取指定位置 buf.getByte(1);
            //
        }
    }
    

    ByteBuf动态扩容
    capacity默认值:256字节、最大值:Integer.MAX_VALUE()
    容量计算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)
    capacity的最小只要求,对应两套计算方案

    • 没超过4兆,从64字节开始,每次增加一倍,直至计算出来的新容量满足最小要求
    • 超过4兆,新容量=新容量最小要求 / 4兆 * 4兆 + 4兆

    选择适合的ByteBuf实现

    • Netty默认使用 堆外内存 pool 中 unsafe方式
    • 如果自己使用,它推荐使用堆内 unpoll 的safe方式
    bytebuf实现

    Unsafe

    unsafe意味着不安全的操作。但是更底层的操作会带来性能提升和特殊功能,Netty中会尽力使用unsafe
    Java语言中很重要的特性是一次编写到处运行,所以它针对底层的内存或者其他操作,做了很多封装。而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容或者不可知的异常

    unsafe 可操作内容

    PooledByteBuf对象、内存复用

    PoolThreadCache: PooledByteBufAllocator实例维护的一个线程变量。
    多种分类的MemoryRegionCache数组作用内存缓存,MemoryRegionCache内部是链表,队里里面存Chunk
    PoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memory

    PooledByteBufAllocator.ioBuff运作过程:
    PooledByteBuf

    零拷贝机制

    Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联

    • 将多个ByteBuf合并成为一个逻辑上的ByteBuf,内存中原始数据不变
    • 将byte[]数组包装成ByteBuf对象
    • 将一个ByteBuf切分成多个对象
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.CompositeByteBuf;
    import io.netty.buffer.Unpooled;
    
    import java.nio.charset.Charset;
    
    /**
     * 零拷贝示例
     */
    public class ZeroCopyTest {
    
        // 包装
        @org.junit.Test
        public void wrapTest() {
            byte[] arr = {1, 2, 3, 4, 5};
            ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
            System.out.println(byteBuf.getByte(4));
            arr[4] = 6;
            System.out.println(byteBuf.getByte(4));
        }
    
      // 拆分
        @org.junit.Test
        public void sliceTest() {
            ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
            ByteBuf newBuffer = buffer1.slice(1, 2);
            newBuffer.unwrap();
            System.out.println(newBuffer.toString());
        }
    
        // 合并
        @org.junit.Test
        public void compositeTest() {
            ByteBuf buffer1 = Unpooled.buffer(3);
            buffer1.writeByte(1);
            ByteBuf buffer2 = Unpooled.buffer(3);
            buffer2.writeByte(4);
            CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
            CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
            System.out.println(newBuffer);
        }
    }
    

    相关文章

      网友评论

          本文标题:Netty框架

          本文链接:https://www.haomeiwen.com/subject/tbvtxhtx.html