美文网首页Java
Netty原理与基础(一)

Netty原理与基础(一)

作者: smallmartial | 来源:发表于2020-10-08 18:10 被阅读0次

    1.简介

    Netty是为了快速开发可维护的高性能、高可扩展、网络服务器和客户端程序而提供的异步事件驱动基础框架和工具。换句话说,Netty是一个Java NIO客户端/服务器框架

    2.Netty目标

    • 使开发可以做到“快速和轻松”
    • 做到高性能和高扩展

    3.创建第一个Netty项目

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>${netty.version}</version>
            </dependency>
    
    public class NettyDiscardServer {
        private final int serverPort;
        ServerBootstrap b = new ServerBootstrap();
    
        public NettyDiscardServer(int port) {
            this.serverPort = port;
        }
    
        public void runServer() {
            //创建reactor 线程组
            EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
    
            try {
                //1 设置reactor 线程组
                b.group(bossLoopGroup, workerLoopGroup);
                //2 设置nio类型的channel
                b.channel(NioServerSocketChannel.class);
                //3 设置监听端口
                b.localAddress(serverPort);
                //4 设置通道的参数
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
                b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    
                //5 装配子通道流水线
                b.childHandler(new ChannelInitializer<SocketChannel>() {
                    //有连接到达时会创建一个channel
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // pipeline管理子通道channel中的Handler
                        // 向子channel流水线添加一个handler处理器
                        ch.pipeline().addLast(new NettyDiscardHandler());
                    }
                });
                // 6 开始绑定server
                // 通过调用sync同步方法阻塞直到绑定成功
                ChannelFuture channelFuture = b.bind().sync();
                Logger.info(" 服务器启动成功,监听端口: " +
                        channelFuture.channel().localAddress());
    
                // 7 等待通道关闭的异步任务结束
                // 服务监听通道会一直等待通道关闭的异步任务结束
                ChannelFuture closeFuture = channelFuture.channel().closeFuture();
                closeFuture.sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 8 优雅关闭EventLoopGroup,
                // 释放掉所有资源包括创建的线程
                workerLoopGroup.shutdownGracefully();
                bossLoopGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            int port = NettyDemoConfig.SOCKET_SERVER_PORT;
            new NettyDiscardServer(port).runServer();
        }
    }
    
    • 客户端代码:
    public class EchoClient {
    
        public void start() throws IOException {
    
            InetSocketAddress address =
                    new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                            NioDemoConfig.SOCKET_SERVER_PORT);
    
            // 1、获取通道(channel)
            SocketChannel socketChannel = SocketChannel.open(address);
            // 2、切换成非阻塞模式
            socketChannel.configureBlocking(false);
            //不断的自旋、等待连接完成,或者做一些其他的事情
            while (!socketChannel.finishConnect()) {
    
            }
            Print.tcfo("客户端启动成功!");
    
            //启动接受线程
            Processer processer = new Processer(socketChannel);
            new Thread(processer).start();
    
        }
    
        static class Processer implements Runnable {
            final Selector selector;
            final SocketChannel channel;
    
            Processer(SocketChannel channel) throws IOException {
                //Reactor初始化
                selector = Selector.open();
                this.channel = channel;
                channel.register(selector,
                        SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            }
    
            public void run() {
                try {
                    while (!Thread.interrupted()) {
                        selector.select();
                        Set<SelectionKey> selected = selector.selectedKeys();
                        Iterator<SelectionKey> it = selected.iterator();
                        while (it.hasNext()) {
                            SelectionKey sk = it.next();
                            if (sk.isWritable()) {
                                ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
    
                                Scanner scanner = new Scanner(System.in);
                                Print.tcfo("请输入发送内容:");
                                if (scanner.hasNext()) {
                                    SocketChannel socketChannel = (SocketChannel) sk.channel();
                                    String next = scanner.next();
                                    buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
                                    buffer.flip();
                                    // 操作三:通过DatagramChannel数据报通道发送数据
                                    socketChannel.write(buffer);
                                    buffer.clear();
                                }
    
                            }
                            if (sk.isReadable()) {
                                // 若选择键的IO事件是“可读”事件,读取数据
                                SocketChannel socketChannel = (SocketChannel) sk.channel();
    
                                //读取数据
                                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                int length = 0;
                                while ((length = socketChannel.read(byteBuffer)) > 0) {
                                    byteBuffer.flip();
                                    Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
                                    byteBuffer.clear();
                                }
    
                            }
                            //处理结束了, 这里不能关闭select key,需要重复使用
                            //selectionKey.cancel();
                        }
                        selected.clear();
                    }
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws IOException {
            new EchoClient().start();
        }
    }
    
    

    4.Reactor反应器模式

    4.1Reactor反应器模式中IO事件的处理流程

    image.png
    • 1.通道注册
    • 2.查询选择
    • 3.事件分发
    • 4.io操作和业务处理

    4.2Netty中的Channel通道组件

    • 反应器模式和通道紧密相关,反应器的查询和分发的IO事件都来自于Channel通道组件。
    • Netty中不直接使用Java NIO的Channel通道组件,对Channel通道组件进行了自己的封装。
    • Netty还能处理Java的面向流的OIO(Old-IO,即传统的阻塞式IO)
    • Netty中的每一种协议的通道,都有NIO(异步IO)和OIO(阻塞式IO)两个版本


      image.png

    4.3Netty中的Reactor反应器

    • Netty中的反应器有多个实现类,与Channel通道类有关系。对应于NioSocketChannel通道,Netty的反应器类为:NioEventLoop。
    • NioEventLoop类绑定了两个重要的Java成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。

    4.4 Netty中的Handler处理器

    Netty的Handler处理器分为两大类:第一类是ChannelInboundHandler通道入站处理器;第二类是ChannelOutboundHandler通道出站处理器。二者都继承了ChannelHandler处理器接口

    4.5Netty的流水性模式

    • (1)反应器(或者SubReactor子反应器)和通道之间是一对多的关系:一个反应器可以查询很多个通道的IO事件。
    • (2)通道和Handler处理器实例之间,是多对多的关系:一个通道的IO事件被多个的Handler实例处理;一个Handler处理器实例也能绑定到很多的通道,处理多个通道的IO事件。
    • Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线),它像一条管道,将绑定到一个通道的多个Handler处理器实例,串在一起,形成一条流水线。

    相关文章

      网友评论

        本文标题:Netty原理与基础(一)

        本文链接:https://www.haomeiwen.com/subject/pfjhpktx.html