美文网首页
Netty学习笔记

Netty学习笔记

作者: MccreeFei | 来源:发表于2018-10-31 17:21 被阅读0次

    NIO

    public class NIOServer {
    
        /*标识数字*/
        private  int flag = 0;
        /*缓冲区大小*/
        private  int BLOCK = 4096;
        /*接受数据缓冲区*/
        private  ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
        /*发送数据缓冲区*/
        private  ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
        private  Selector selector;
    
        public NIOServer(int port) throws IOException {
            // 打开服务器套接字通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 服务器配置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 检索与此通道关联的服务器套接字
            ServerSocket serverSocket = serverSocketChannel.socket();
            // 进行服务的绑定
            serverSocket.bind(new InetSocketAddress(port));
            // 通过open()方法找到Selector
            selector = Selector.open();
            // 注册到selector,等待连接
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("Server Start----8888:");
        }
    
    
        // 监听
        private void listen() throws IOException {
            while (true) {
                // 阻塞 直到有事件到达
                selector.select();
                // 返回此选择器的已选择键集。
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    handleKey(selectionKey);
                }
            }
        }
    
        // 处理请求
        private void handleKey(SelectionKey selectionKey) throws IOException {
            // 接受请求
            ServerSocketChannel server = null;
            SocketChannel client = null;
            String receiveText;
            String sendText;
            int count=0;
            // 测试此键的通道是否已准备好接受新的套接字连接。
            if (selectionKey.isAcceptable()) {
                // 返回为之创建此键的通道。
                server = (ServerSocketChannel) selectionKey.channel();
                // 接受到此通道套接字的连接。
                // 此方法返回的套接字通道(如果有)将处于阻塞模式。
                client = server.accept();
                // 配置为非阻塞
                client.configureBlocking(false);
                // 注册到selector,等待连接
                client.register(selector, SelectionKey.OP_READ);
            } else if (selectionKey.isReadable()) {
                // 返回为之创建此键的通道。
                client = (SocketChannel) selectionKey.channel();
                //将缓冲区清空以备下次读取
                receivebuffer.clear();
                //读取服务器发送来的数据到缓冲区中
                count = client.read(receivebuffer);
                if (count > 0) {
                    receiveText = new String( receivebuffer.array(),0,count);
                    System.out.println("服务器端接受客户端数据--:"+receiveText);
                    client.register(selector, SelectionKey.OP_WRITE);
                }
            } else if (selectionKey.isWritable()) {
                //将缓冲区清空以备下次写入
                sendbuffer.clear();
                // 返回为之创建此键的通道。
                client = (SocketChannel) selectionKey.channel();
                sendText="message from server--" + flag++;
                //向缓冲区中输入数据
                sendbuffer.put(sendText.getBytes());
                //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
                sendbuffer.flip();
                //输出到通道
                client.write(sendbuffer);
                System.out.println("服务器端向客户端发送数据--:"+sendText);
                client.register(selector, SelectionKey.OP_READ);
            }
        }
    
        /**
         * @param args
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
            int port = 8888;
            NIOServer server = new NIOServer(port);
            server.listen();
        }
    }
    
    public class NIOClient {
    
        /*标识数字*/
        private static int flag = 0;
        /*缓冲区大小*/
        private static int BLOCK = 4096;
        /*接受数据缓冲区*/
        private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
        /*发送数据缓冲区*/
        private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
        /*服务器端地址*/
        private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress(
                "localhost", 8888);
    
        public static void main(String[] args) throws IOException {
            // 打开socket通道
            SocketChannel socketChannel = SocketChannel.open();
            // 设置为非阻塞方式
            socketChannel.configureBlocking(false);
            // 打开选择器
            Selector selector = Selector.open();
            // 注册连接服务端socket动作
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            // 连接
            socketChannel.connect(SERVER_ADDRESS);
            // 分配缓冲区大小内存
    
            Set<SelectionKey> selectionKeys;
            Iterator<SelectionKey> iterator;
            SelectionKey selectionKey;
            SocketChannel client;
            String receiveText;
            String sendText;
            int count=0;
    
            while (true) {
                //选择一组键,其相应的通道已为 I/O 操作准备就绪。
                //此方法执行处于阻塞模式的选择操作。
                selector.select();
                //返回此选择器的已选择键集。
                selectionKeys = selector.selectedKeys();
                //System.out.println(selectionKeys.size());
                iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    selectionKey = iterator.next();
                    if (selectionKey.isConnectable()) {
                        System.out.println("client connect");
                        client = (SocketChannel) selectionKey.channel();
                        // 判断此通道上是否正在进行连接操作。
                        // 完成套接字通道的连接过程。
                        if (client.isConnectionPending()) {
                            client.finishConnect();
                            System.out.println("完成连接!");
                            sendbuffer.clear();
                            sendbuffer.put("Hello,Server".getBytes());
                            sendbuffer.flip();
                            client.write(sendbuffer);
                        }
                        client.register(selector, SelectionKey.OP_READ);
                    } else if (selectionKey.isReadable()) {
                        client = (SocketChannel) selectionKey.channel();
                        //将缓冲区清空以备下次读取
                        receivebuffer.clear();
                        //读取服务器发送来的数据到缓冲区中
                        count=client.read(receivebuffer);
                        if(count>0){
                            receiveText = new String( receivebuffer.array(),0,count);
                            System.out.println("客户端接受服务器端数据--:"+receiveText);
                            client.register(selector, SelectionKey.OP_WRITE);
                        }
    
                    } else if (selectionKey.isWritable()) {
                        sendbuffer.clear();
                        client = (SocketChannel) selectionKey.channel();
                        sendText = "message from client--" + (flag++);
                        sendbuffer.put(sendText.getBytes());
                        //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
                        sendbuffer.flip();
                        client.write(sendbuffer);
                        System.out.println("客户端向服务器端发送数据--:"+sendText);
                        client.register(selector, SelectionKey.OP_READ);
                    }
                }
                selectionKeys.clear();
            }
        }
    }
    

    Buffer

    NIO ByteBuffer

    //重要属性
    private int position;
    private int limit;
    private int capacity;
    

    写模式时 position指向当前待写的索引处,limitcapacity都指向数组索引最大值+1处

    写模式
    读模式 需要调用flip()方法
      public final Buffer flip() {
            limit = position;
            position = 0;
            mark = -1;
            return this;
        }
    

    position置为0,limit置为写模式下postion,标记最大可读位置。capacity不变表示此buffer内存数组最大容量。

    读模式

    Netty ByteBuf

    Neety ByteBuf不同于NIO ByteBuffer,ByteBuffer读写模式使用同一套索引,转换需要调用flip。而ByteBuf同时维护了两个索引:读索引和写索引,readIndexwriteIndex。有三种种类的buffer,堆缓冲/直接缓冲/混合缓冲

    heap buffers

    即在JVM堆中分配的字节数组,最普遍的缓冲形式,分配释放也较为廉价,一般使用方式:


    head buffers

    direct buffers

    直接缓存由JVM通过本地调用在对外进行分配,分配释放比heap buffers更为昂贵,但是效率高省去了一次拷贝,因为堆内存在socket中进行传输之前,都需要进行一次heap buffer 到 direct buffer的拷贝,使用方式:


    direct buffers

    composite buffers

    对各种ByteBuf的组合使用,相当于ByteBuf的容器,使用方式:


    composite buffers

    引用计数 refCount

    ByteBuf采用引用计数,ByteBuf被分配后reCount=1,调用retain()方法refCount+1,调用release()方法refCount-1.当refCount==0时代表ByeBuf生命周期已过,会被释放或者归池。调用refCount==0的ByteBuf将会抛出IllegalReferenceCountException。需要注意的是有些过程是netty内部帮助release的,比如SimpleChannelInboundHandler

     @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            boolean release = true;
            try {
                if (acceptInboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I imsg = (I) msg;
                    channelRead0(ctx, imsg);
                } else {
                    release = false;
                    ctx.fireChannelRead(msg);
                }
            } finally {
                if (autoRelease && release) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    

    默认构造器实现类autoRelease为true,channelRead0模板方法由用户进行实现,处理信息,处理完成后可以看到netty帮助释放了。
    ByteBuf拷贝,copy()进行深拷贝即内部字节数组也会即你想嗯拷贝,duplicate()进行浅拷贝,只对readIndex和writeIndex进行拷贝,但共用同一个字节数组。同样readSlice()浅拷贝,readBytes()深拷贝。

    Netty

    Netty服务端典型代码:

    final EchoServerHandler serverHandler = new EchoServerHandler();
            EventLoopGroup boosGroup = new NioEventLoopGroup(1);
            EventLoopGroup workGroup = new NioEventLoopGroup(3);
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(boosGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .localAddress(new InetSocketAddress(port))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(serverHandler);
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                ChannelFuture f = b.bind().sync();
                System.out.println(EchoServer.class.getName() +
                        " started and listening for connections on " + f.channel().localAddress());
                f.channel().closeFuture().sync();
            } finally {
                boosGroup.shutdownGracefully().sync();
                workGroup.shutdownGracefully().sync();
            }
    

    ServerSocketChannel初始化

    ServerBootstrap中进行Channel等初始化工作实在bind()方法中进行的,一系列调用链后在initAndRegister()方法中进行ServerSocketChannel的初始化工作。

     final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    

    这里重要的是channel实例化和channel初始化两部操作,channelFactory.newChannel()使用Channel的反射工厂生成一个Channel实例。

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                return provider.openServerSocketChannel();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
        /**
         * Create a new instance
         */
        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
        /**
         * Create a new instance using the given {@link SelectorProvider}.
         */
        public NioServerSocketChannel(SelectorProvider provider) {
            this(newSocket(provider));
        }
        /**
         * Create a new instance using the given {@link ServerSocketChannel}.
         */
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    

    使用默认provider构建了一个java nio的ServerSocketChannel实例,后再调用重载构造方法,设置感兴趣事件OP_ACCEPT用于监听客户端连接事件,构造器一直调用上层构造器,其中在父类AbstractCHannel中实例化Unsafe和Pipeline,Unsafe封装了Java底层的Socket操作,实际上是沟通 Netty 上层和 Java 底层的重要的桥梁。
    实例化过程:

    • 调用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO ServerSocketChannel

    • AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:

      • parent 属性置为 null

      • unsafe 通过newUnsafe() 实例化一个 unsafe 对象, 它的类型是 AbstractNioMessageChannel#AbstractNioUnsafe 内部类

      • pipeline 是 new DefaultChannelPipeline(this) 新创建的实例.

    • AbstractNioChannel 中的属性:

      • SelectableChannel ch 被设置为 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.

      • readInterestOp 被设置为 SelectionKey.OP_ACCEPT

      • SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)

    • NioServerSocketChannel 中的属性:

      • ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())

    接下来是Channel的初始化工作:

    @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    init()方法中首先设置配置Options和Attributes,重点在于最后向Pipeline中添加一个ServerBootstrapAcceptor,ServerBootStrapAcceptor继承自ChannelInboundHandlerAdapter,核心是其中的channelRead()方法

    @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    

    当有客户端连接时,连接后的SocketChannel会有Pipeline进行链式调用,于是就会调用该channelRead方法,首先将ServerBootStrap中配置的childHanlder添加到SocketChannel的Pipeline中,然后设置SocketChannel的Options和Attributes,最后将SocketChannel注册到workGroup,绑定到一个EventLoop上,完成整个ServerChannel初始化工作。

    EventLoopGroup

    image

    EventLoopGroup相当于线程池,EventLoop相当于线程池中的线程用于处理相关的IO事件。Server拥有两个线程池,bossGroup用于接受客户端连接事件,workGroup用于处理连接后SocketChannel产生的IO事件。一旦SocketChannel就绪就会在workGroup中进行注册,平均注册到一个EventLoop,此后该Channel就会与该EventLoop绑定,一个Channel对应一个EventLoop,但一个EventLoop可以对应多个Channel。

    ChannelHandler

    ChannelHandler为Netty消息的处理器,分为InboundChannelHandler和OutbountChannelHandler,对于Channel的read消息由InbountChannelHandler处理,write消息由OutbountChannelHandler处理。常见的几种处理器有ChannelInitializer是InbountChannelHandler,一般用于向Pipline中添加其他Handler,只会调用一次,完成后就会从Pipline中移除该Handler。Decoder和Encoder解码和编码器,Decoder为InbountHandler,Encoder为OutbountHandler。

    ChannelPipline

    一个Channel对应了一个CahnnelPipeline,维护保存了ChannelHandler的链式结构,具体如图:


    ChannelPipeline

    ChannelContext

    ChannelContext是对ChannelHandler的封装,在ChannelHandler添加进Pipeline时生成,其实Pipeline中的链式结构保存的是ChannelContext的双向链表,ChannelHandler是不知道所在Pipeline中前后ChannelHandler的,但ChannelContext保存有前后ChannelContext的索引。ChannelContext调用下一个处理器相关方法是以fire为开头的,比如context.fireChannelRead()即调用下一个处理器的channelRead操作。
    还有需要注意的是pipeline和channel调用write()都是从第一个outbount开始链式处理,但context调用write()是从当前context的下一个outbount开始链式处理。

    //channel
       @Override
        public ChannelFuture write(Object msg) {
            return pipeline.write(msg);
        }
    //pipeline
        @Override
        public final ChannelFuture write(Object msg) {
            return tail.write(msg);
        }
    //context
            private void write(Object msg, boolean flush, ChannelPromise promise) {
                AbstractChannelHandlerContext next = findContextOutbound();
                final Object m = pipeline.touch(msg, next);
            ........
        }
         private AbstractChannelHandlerContext findContextOutbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;
            } while (!ctx.outbound);
            return ctx;
        }
    

    相关文章

      网友评论

          本文标题:Netty学习笔记

          本文链接:https://www.haomeiwen.com/subject/fyoatqtx.html