美文网首页
Netty之Reactor模型

Netty之Reactor模型

作者: Balram | 来源:发表于2021-03-21 21:24 被阅读0次

    Java NIO几个组件

    简单介绍几个组件的概念,并不深入。通过一个简单的例子来说明组件如何搭配使用

    Buffer

    NIO的Buffer(缓冲区)本质是一块内存块,可以写入数据,也可以从中读取数据。

    NIO的Buffer类,是一个抽象类,内部是一个内存块(数组),相比数组不同的是,提供了一组更加有效的方法,用来进行写入和读取的交替访问。

    Channel

    从广泛层面来说,一个通道可以表示一个底层的文件描述符,如:硬件设备、文件、网络连接。最重要的4种Channel实现

    1)FileChannel:文件通道,用于文件的数据读写

    2)SocketChannel:套接字通道,用于Socket套接字TCP连接的数据读写。

    3)ServerSocketChannel:服务器套接字通道,允许监听TCP连接请求,为每个监听的请求,创建一个SocketChannel套接字通道。

    4)DatagramChannel:数据报通道,用于UDP协议的数据读写。

    Selector

    Selector(选择器)是一种特殊的组件,用于采集各个通道的状态(或者说事件),先将通道注册到选择器,并设置好关心的事件,然后就额可以通过调用select方法,等待事件发生。

    选择器的使命是:完成IO的多路复用。一个通道代表一条连接通路,通过选择器可以同时监听多个通道的IO(输入、输出)状况。选择器和通道的关系,是监控和被监控的关系。

    一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道。

    通道和选择器之间的关系,通过register(注册)的方式完成,可以将通道实例注册到一个选择器中。register有两个参数:第一个是选择器,第二个是要监控的IO事件类型。

    通道有4个事件可以监听:

    1)Accept: 可以接受的连接。SelectionKey.OP_ACCEPT

    2)Connect:连接成功。SelectionKey.OP_CONNECT

    3)Read:有数据可读。SelectionKey.OP_READ

    4)Write:有数据可写。SelectionKey.OP_WRITE

    SelectionKey

    通道和选择器的监控关系,在注册成功后,就可以选择就绪事件。通过选择器的select方法来完成。

    SelectionKey选择键,就是被选择器选中的IO事件。一个IO事件发生,之前在选择器中注册过,就会被选择器选中,并放入到SelectionKey选择集中。

    Discard例子

    简单的例子如下,具体可见注释

    public static void startServer() throws IOException {
        //获取选择器
        Selector selector = Selector.open();
        //获取ServerSocketChannel通道
        ServerSocketChannel server = ServerSocketChannel.open();
        //设置为非阻塞
        server.configureBlocking(false);
        //绑定端口
        server.bind(new InetSocketAddress(10101));
        //将通道注册到选择器,并监听ACCEPT事件
        server.register(selector, SelectionKey.OP_ACCEPT);
    
        while (true) {
            //轮询感兴趣的IO就绪事件,阻塞方法
            selector.select();
            //获取选择键组合
            Iterator<SelectionKey> selectKeys = selector.selectedKeys().iterator();
            while (selectKeys.hasNext()) {
                //取单个选择键
                SelectionKey selectionKey = selectKeys.next();
                //如果IO事件,是连接就绪事件
                if (selectionKey.isAcceptable()) {
                    //获取客户端连接
                    SocketChannel client = server.accept();
                    //设置为非阻塞模式
                    client.configureBlocking(false);
                    //将通道注册到选择器,事件为:可读事件
                    client.register(selector, SelectionKey.OP_READ);
    
                    //如果IO事件是可读事件
                } else if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int len = 0;
                    //读取事件,然后丢弃
                    while ((len = channel.read(buffer)) > 0) {
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, len));
                        buffer.clear();
                    }
                    channel.close();
                }
                //移除选择键,否则会重复
                selectKeys.remove();
            }
        }
    }
    

    Reactor三种模型

    单线程模型

    image

    工作流程:

    只有一个select循环接收请求,客户端注册进来,由Reactor接收注册事件,然后由Reactor分发出去,再有Handler处理。

    特点:

    主要有一个Handler方法阻塞了,就会导致所有的client的Handler阻塞,也会导致注册事件无法处理,如法接收新请求,这种模式用的比较少,不能充分利用多核的资源。

    Echo Server例子:

    public class Reactor implements Runnable {
    
        Selector selector;
        ServerSocketChannel serverSocket;
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(new Reactor());
            thread.start();
            thread.join();
        }
    
        public Reactor() {
            try {
                selector = Selector.open();
                serverSocket = ServerSocketChannel.open();
                serverSocket.socket().bind(new InetSocketAddress(9090));
                serverSocket.configureBlocking(false);
    
                SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
                selectionKey.attach(new Acceptor());
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        dispatch(it.next());
                    }
                    selected.clear();
                }
            } catch (IOException e) {
    
            }
        }
    
        void dispatch(SelectionKey key) {
            Runnable handler = (Runnable) key.attachment();
            if (handler != null) {
                handler.run();
            }
        }
    
        class Acceptor implements Runnable {
    
            @Override
            public void run() {
                SocketChannel client = null;
                try {
                    client = serverSocket.accept();
                    if (client != null) {
                        new Handler(selector, client);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Handler implements Runnable {
            SocketChannel socket;
            SelectionKey sk;
            ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
            final static int READING = 0, SENDING = 1;
            int state = READING;
    
            public Handler(Selector selector, SocketChannel c) {
                try {
                    socket = c;
                    c.configureBlocking(false);
                    sk = socket.register(selector, 0);
                    sk.attach(this);
                    sk.interestOps(SelectionKey.OP_READ);
                    selector.wakeup();
                } catch (IOException ignore) {
                    ignore.printStackTrace();
                }
            }
    
            @Override
            public void run() {
                try {
                    if (state == READING) {
                        int length = 0;
                        while ((length = socket.read(buffer)) > 0) {
                            System.out.println(new String(buffer.array(), 0, length));
                        }
                        buffer.flip();
                        sk.interestOps(SelectionKey.OP_WRITE);
                        state = SENDING;
                    } else if (state == SENDING) {
    
                        socket.write(buffer);
    
                        buffer.clear();
                        sk.interestOps(SelectionKey.OP_READ);
                        state = READING;
    
                    }
                } catch (IOException e) {
    
                }
            }
        }
    
    }
    

    多线程模型

    image

    工作流程:

    注册接收事件都是由Reactor来处理,其他计算、编解码等处理都是由线程池来处理。从图中可以看出工作线程是多线程的,监听注册事件Reactor还是单线程。

    特点:

    在Handler读写处理时,交给工作线程处理,不会导致Reactor无法执行,Reactor分发和Handler处理时分开的,所以能充分利用资源

    缺点:

    Reactor只在主线程中运行,承担所有事件的监听和相应,如果短时间的高并发场景下,依然会造成性能瓶颈。

    Echo Server例子:

    public class MultiThreadReactor implements Runnable {
        Selector selector;
        ServerSocketChannel serverSocket;
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(new Reactor());
            thread.start();
            thread.join();
        }
    
        public MultiThreadReactor() {
            try {
                selector = Selector.open();
                serverSocket = ServerSocketChannel.open();
                serverSocket.socket().bind(new InetSocketAddress(9090));
                serverSocket.configureBlocking(false);
    
                SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
                selectionKey.attach(new MultiThreadReactor.Acceptor());
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        Runnable handler = (Runnable) it.next().attachment();
                        if (handler != null) {
                            handler.run();
                        }
                    }
                    selected.clear();
                }
            } catch (IOException e) {
    
            }
        }
    
        class Acceptor implements Runnable {
    
            @Override
            public void run() {
                SocketChannel client = null;
                try {
                    client = serverSocket.accept();
                    if (client != null) {
                        new MultiThreadReactor.Handler(selector, client);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Handler implements Runnable {
            SocketChannel socket;
            SelectionKey sk;
            ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
            final static int READING = 0, SENDING = 1;
            int state = READING;
            //避免重复创建
            static ExecutorService pool = Executors.newFixedThreadPool(4);
    
            public Handler(Selector selector, SocketChannel c) {
                try {
                    socket = c;
                    c.configureBlocking(false);
                    sk = socket.register(selector, 0);
                    sk.attach(this);
                    sk.interestOps(SelectionKey.OP_READ);
                    selector.wakeup();
                } catch (IOException ignore) {
                    ignore.printStackTrace();
                }
            }
    
            @Override
            public void run() {
                pool.execute(() -> {
                    try {
                        if (state == READING) {
                            int length = 0;
                            while ((length = socket.read(buffer)) > 0) {
                                System.out.println(new String(buffer.array(), 0, length));
                            }
                            buffer.flip();
                            sk.interestOps(SelectionKey.OP_WRITE);
                            state = SENDING;
                        } else if (state == SENDING) {
    
                            socket.write(buffer);
    
                            buffer.clear();
                            sk.interestOps(SelectionKey.OP_READ);
                            state = READING;
    
                        }
                    } catch (IOException e) {
    
                    }
                });
            }
        }
    }
    

    主从模型

    image

    工作流程:

    mainReactor负责监听客户端请求,专门处理新连接的建立,将建立好的连接注册到subReactor

    subReactor将分配的连接加入到队列进行监听,当有新的事件发生时,会调用连接相对应的Handler处理。

    特点:

    mainReactor 主要是用来处理客户端请求连接建立的操作。 subReactor主要做和建立起来的连接做数据交互和事件业务处理操作,每个subReactor一个线程来处理。这样的模型,使得每个模块更加专一,耦合度更低,支持更高的并发量。

    Echo Server例子:

    public class MultiReactors {
    
        ServerSocketChannel serverSocket;
        Selector mainSelect;
        Selector[] selectors = new Selector[2];
        Reactor mainReactor = null;
        Reactor[] subReactors = null;
        AtomicInteger step = new AtomicInteger(0);
    
        public static void main(String[] args) throws InterruptedException {
            MultiReactors multiEchoHandler = new MultiReactors();
            multiEchoHandler.startService();
        }
        private void startService() throws InterruptedException {
            new Thread(mainReactor).start();
            new Thread(subReactors[0]).start();
            Thread thread = new Thread(subReactors[1]);
            thread.start();
            thread.join();
        }
    
        public MultiReactors() {
            try {
                mainSelect = Selector.open();
    
                selectors[0] = Selector.open();
                selectors[1] = Selector.open();
                serverSocket = ServerSocketChannel.open();
                serverSocket.socket().bind(new InetSocketAddress(9090));
                serverSocket.configureBlocking(false);
    
                //第一个选择器,负责监控新连接事件
                SelectionKey selectionKey = serverSocket.register(mainSelect, SelectionKey.OP_ACCEPT);
                selectionKey.attach(new MultiReactors.Acceptor());
    
                mainReactor = new Reactor(mainSelect);
    
                Reactor subReactor1 = new Reactor(selectors[0]);
                Reactor subReactor2 = new Reactor(selectors[1]);
                subReactors = new Reactor[]{subReactor1,subReactor2};
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        class Reactor implements Runnable {
            final Selector selector;
    
            public Reactor(Selector selector) {
                this.selector = selector;
            }
    
            @Override
            public void run() {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        selector.select();
                        Set<SelectionKey> keys = selector.selectedKeys();
                        Iterator<SelectionKey> it = keys.iterator();
                        while (it.hasNext()) {
                            Runnable handler = (Runnable) it.next().attachment();
                            if (handler != null) {
                                handler.run();
                            }
                        }
                        keys.clear();
                    }
                } catch (IOException e) {
    
                }
            }
    
        }
    
    
    
    
        class Acceptor implements Runnable {
    
            @Override
            public void run() {
                SocketChannel client = null;
                try {
                    client = serverSocket.accept();
                    if (client != null) {
                        Selector selector = selectors[step.getAndIncrement() % selectors.length];
                        new MultiReactors.MultiEchoHandler(selector, client);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class MultiEchoHandler implements Runnable {
            SocketChannel socket;
            SelectionKey sk;
            ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
            final static int READING = 0, SENDING = 1;
            int state = READING;
            //避免重复创建
            static ExecutorService pool = Executors.newFixedThreadPool(4);
    
            public MultiEchoHandler(Selector selector, SocketChannel c) {
    
                try {
                    /**
                     * 如果没有此代码,发现会无法注册,
                     * 这是因为从Reactor 目前正阻塞在select()方法上,
                     * 此方法锁定了publicKeys(已注册的key),直接注册会造成死锁,
                     * 通过调用wakeup,有可能还没注册成功又阻塞了。这是一个多线程同步的问题
                     */
                    selector.wakeup();
    
                    socket = c;
                    c.configureBlocking(false);
                    sk = socket.register(selector, SelectionKey.OP_READ);
                    sk.attach(this);
    //                sk.interestOps(SelectionKey.OP_READ);
                    selector.wakeup();
                } catch (IOException ignore) {
                    ignore.printStackTrace();
                }
            }
    
            @Override
            public void run() {
                //提交到线程池中执行
                pool.execute(this::doHandle);
    
            }
    
            private synchronized void doHandle(){
                try {
                    if (state == READING) {
                        int length = 0;
                        while ((length = socket.read(buffer)) > 0) {
                            System.out.println(new String(buffer.array(), 0, length));
                        }
                        buffer.flip();
                        sk.interestOps(SelectionKey.OP_WRITE);
                        state = SENDING;
                    } else if (state == SENDING) {
    
                        socket.write(buffer);
    
                        buffer.clear();
                        sk.interestOps(SelectionKey.OP_READ);
                        state = READING;
                    }
                } catch (IOException e) {
    
                }
            }
        }
    
    }
    

    Netty实现Reactor

    主要分析一下,Netty如何实现Reactor的模型,其原理图如下图,主要考虑主从Reactor的方式。

    image

    一般主从Reactor代码如下:会创建两个个NioEventLoopGroup,一个用于main,一个用于sub。

    public void start() throws InterruptedException {
        EchoServerHandler handler = new EchoServerHandler();
        //创建EventLoopGroup
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
    
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss,work)
                    //指定所使用的NIO 传输channel
                    .channel(NioServerSocketChannel.class)
                    .localAddress(port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //EchoServerHandler 被标注为@Shareable,所以我们可以总是使用同样的实例
                            socketChannel.pipeline().addLast(handler);
                        }
                    });
            //异步绑定服务器,调用sync方法阻塞等待直到绑定完成
            ChannelFuture f = b.bind().sync();
            //获取closeFuture,阻塞直到完成
            f.channel().closeFuture().sync();
        }finally {
            //释放所有资源
            boss.shutdownGracefully().sync();
            work.shutdownGracefully().sync();
        }
    }
    

    根据Reactor的主从模型图,要分析Netty如何实现,就要分成几个部分

    1)mainReactor如何监听连接

    2)mainReactor监听获取到的连接,如何注册到subReactor

    3)subReactor如何监听事件。

    mainReactor如何监听连接

    从boostrapt的bind方法作为入口,内部调用会如下

    //AbstractBootstrap#doBind
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        //...省略...
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            //...省略...
            return promise;
        }
    }
    

    1)doBind内部会进行两个操作,一个是initAndRegister初始化channel和register。另外一个是doBind0(会在9的地方讲解),会进行的操作是bind一个端口。

    //AbstractBootstrap#initAndRegister
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
    
        }
       //...省略...
        ChannelFuture regFuture = config().group().register(channel);
       //...省略...
    }
    

    2)initAndRegister内部会根据具体的Channel,生成一个Channel实例,然后执行init方法。然后会执行register注册操作,register时,会通过group的register方法,选择一个EventLoop,然后进行register。

    //ServerBootstrap#init
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
    
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
    

    3)在init方法中,会把ServerBootstrapAcceptor 注册到这个channel的pipeline中。这是mainReactor把连接处理到subReactor的关键。

    //SingleThreadEventLoop#register
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    

    4)register方法具体会到SingleThreadEventLoop(NioEventLoop的父类)的register方法,具体又会到unfase里的register方法

    //AbstractChannel#register
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    
        //...省略...
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
               //...省略...
            }
        }
    }
    

    5)在AbstractChannel的register方法中,会进行chanel和eventloop进行绑定。

    //AbstractChannel$AbstractUnsafe#register0
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    //...省略...
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            // This channel was registered before and autoRead() is set. This means we need to begin read
            // again so that we process inbound data.
            //
            // See https://github.com/netty/netty/issues/4805
            beginRead();
        }
    }
    

    6)内部主要是两个处理,一个是doRegister执行底层的注册方法,另外一个是 pipeline.fireChannelActive();触发通知。

    //AbstractNioChannel#doRegister 
    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    

    7)doRegister会进行真正的注册,但是这里面并没有监听的事件。另外attachment是把当前的channel附上了。

    //NioServerSocketChannel#isActive
    public boolean isActive() {
        // As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed
        // we will also need to check if it is open.
        return isOpen() && javaChannel().socket().isBound();
    }
    

    8)AbstractChannel$AbstractUnsafe#register0方法里有个isActive方法判断,具体就会到isActive方法判断,此时还没有绑定端口,所以不会执行代码块里的内容。

    //AbstractBootstrap#doBind0
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    9)回到AbstractBootstrap#doBind0方法中,会通过EventLoop执行channel的bind方法。channel.bind方法,实际上会调用pipeline.bind()方法。pipeline.bind()方法实际上会调用tail.bind()方法。DefaultChannelPipeline$HeadContext实际上实现了ChannelOutboundHandler、和ChannelInboundHandler接口,所以最后会调用DefaultChannelPipeline$HeadContext的bind方法.

    //DefaultChannelPipeline$HeadContext#bind
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
        unsafe.bind(localAddress, promise);
    }
    //AbstractChannel#bind
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
       //...省略代码....
    
        boolean wasActive = isActive();
        try {
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
    
        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }
        safeSetSuccess(promise);
    }
    //NioServerSocketChannel#doBind
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    

    10)DefaultChannelPipelineHeadContext#bind实际上会调用unsafe.bind方法,实际上就会调用AbstractChannel#bind方法。doBind方法实际上会调用NioServerSocketChannel#doBind方法,内部就是调用nio的channel进行绑定端口。最后如果是isActive的话,则会执行 pipeline.fireChannelActive();方法,pipeline内部会有head个tail,所以会执行到DefaultChannelPipelineHeadContext#channelActive方法

    //DefaultChannelPipeline$HeadContext#channelActive
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.fireChannelActive();
    
        readIfIsAutoRead();
    }
    //DefaultChannelPipeline$HeadContext#readIfIsAutoRead
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
         }
     }
    //AbstractChannel#read
     public Channel read() {
          pipeline.read();
          return this;
     }
    //DefaultChannelPipeline$HeadContext#read
     public void read(ChannelHandlerContext ctx) {
          unsafe.beginRead();
     }
    

    11)HeadContext实际上实现了ChannelOutboundHandler、和ChannelInboundHandler接口。当channelActive的时候,会触发channel.read(),而channel实际上会触发pipeline的read(),最后实际上会到HeadContext的read()方法(实现了ChannelOutboundHandler接口)

    //AbstractChannel#beginRead
    public final void beginRead() {
        //...省略...
        try {
            doBeginRead();
        } catch (final Exception e) {
          //...省略...
        }
    }
    //AbstractNioChannel#doBeginRead
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    

    12)NIO为例,最后可以发现,在AbstractNioChannel的doBeginRead的地方进行了注册,把实际感兴趣的信息注册上去。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
    

    13)NioServerSocketChannel和NioSocketChannel关注不一样的事件,NioServerSocketChannel关注的是SelectionKey.OP_ACCEPT。而NioSocketChannel默认关注的是SelectionKey.OP_READ。

    小结:虽然bootstap.bind()方法调用很简单,但是内部却很复杂。初始化了所需要的用到的组件,channel,pipeline,绑定了eventloop等,另外把channel注册到了选择器上。绑定了本地端口,并监听了ACCEPT事件。eventloop内部就是通过选择器的select进行轮询获取事件,然后将事件丢到subReactor上。

    连接注册到subReactor

    以NioEventLoop为例,内部会有个Selector,然后有个循环不断的select()事件。抛弃一些细节,之间看主要内容。

    每个channel内部都有一个unsafe,简单记忆一下:

    NioServerSocketChannel的是AbstractNioMessageChannel$NioMessageUnsafe

    NioSocketChannel的是AbstractNioByteChannel$NioByteUnsafe

    //NioEventLoop#processSelectedKey
    int readyOps = k.readyOps();
    // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
    // the NIO JDK channel implementation may throw a NotYetConnectedException.
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
    
        unsafe.finishConnect();
    }
    
    // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        ch.unsafe().forceFlush();
    }
    
    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    // to a spin loop
    //读事件和接收链接事件
    //1\. 如果NioEventLoop 是work线程的话,这里就是op_read事件
    //2\. 如果NioEventLoop 是boss线程的话,这里就是op_accept事件
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
    

    1)各种事件判断都在这里,由于NioServerSocketChannel会订阅SelectionKey.OP_ACCEPT事件,所以会触发NioMessageUnsafe.read()方法

    //NioServerSocketChannel#doReadMessages
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());
    
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
    
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
    
        return 0;
    }
    

    2)NioMessageUnsafe.read()内部会调用doReadMessages方法,NioServerSocketChannel的read方法,就是从accept中获取SocketChannel对象。

    //AbstractNioMessageChannel$NioMessageUnsafe#read
    //..省略代码
    try {
        do {
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }
    
            allocHandle.incMessagesRead(localRead);
        } while (allocHandle.continueReading());
    } catch (Throwable t) {
        exception = t;
    }
    
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();
    //..省略代码
    

    3)doReadMessages调用完后,会执行pipeline.fireChannelRead方法,会把SocketChannel对象给传递出去

    subReactor如何监听事件

    之前其实有提到ServerBootstrap会把ServerBootstrapAcceptor这个handler加入到server的handler里面,所以,会触发ServerBootstrapAcceptor的channelRead方法。

    //ServerBootstrap$ServerBootstrapAcceptor#channelRead
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
    
        child.pipeline().addLast(childHandler);
    
        setChannelOptions(child, childOptions, logger);
        setAttributes(child, childAttrs);
    
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    

    1)读取到的channel实际上就是NioSocketChannel了。此时会进行childGroup的register。刚刚分析的了boosGroup的register,实际上他们的流程都是通用的,只是有些具体的实现类不一样。NioServerSocketChannel关注的是SelectionKey.OP_ACCEPT。而NioSocketChannel默认关注的是SelectionKey.OP_READ。

    //AbstractChannel#register0
    private void register0(ChannelPromise promise) {
        try {
            //...省略代码...
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
           //...省略代码...
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
           //...省略代码...
        }
    }
    //NioSocketChannel#isActive
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }
    

    2)在AbstractChannel#register0的方法里,有个isActive的判断,具体就是NioSocketChannel#isActive,此时该判断返回的是true,随后会绕一圈,然后执行AbstractNioChannel#doBeginRead方法,内部会订阅SelectionKey.OP_READ事件。

    //NioEventLoop#processSelectedKey
    
    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    // to a spin loop
    //读事件和接收链接事件
    //1\. 如果NioEventLoop 是work线程的话,这里就是op_read事件
    //2\. 如果NioEventLoop 是boss线程的话,这里就是op_accept事件
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
    

    3)之前已经订阅了OP_READ事件,所以这里会执行usafe.read()方法。之前有说到过NioSocketChannel的unsafe实际上是AbstractNioByteChannel$NioByteUnsafe

    //AbstractNioByteChannel$NioByteUnsafe#read
    do {
        byteBuf = allocHandle.allocate(allocator);
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        if (allocHandle.lastBytesRead() <= 0) {
            // nothing was read. release the buffer.
            byteBuf.release();
            byteBuf = null;
            close = allocHandle.lastBytesRead() < 0;
            if (close) {
                // There is nothing left to read as we received an EOF.
                readPending = false;
            }
            break;
        }
    
        allocHandle.incMessagesRead(1);
        readPending = false;
        pipeline.fireChannelRead(byteBuf);
        byteBuf = null;
    } while (allocHandle.continueReading());
    

    4)read方法就是读取缓冲区的数据到ByteBuf,然后触发pipeline.fireChannelRead(byteBuf);把数据传播出去。

    //DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle#continueReading
    public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
        return config.isAutoRead() &&
               (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
               totalMessages < maxMessagePerRead &&
               totalBytesRead > 0;
    }
    

    5)每次读取数据会进行判断,是否继续读。其中有个重要的参数maxMessagePerRead在Nio里默认为16,也就是说针对一个channel每次最多读16次,防止某个channel数据量大时,一直读取数据,而忽略了其他channnel的数据读取。

    总结

    Netty的Reactor原理和NIO一样,只是进行了复杂的抽象和封装,每个步骤被散落到各个角度里。高度的抽象,对于代码理解是不易的,但是实现上变得容易扩展,上述例子只是用到了NIO的例子而已,实际上Netty支持的channel还有很多。可能各个channel的代码都走过一遍时,才会觉得Netty的抽象原来如此高明。个人水平有限,内容仅供参考,可自行验证准确性。

    参考资料

    1)一文让你彻底理解 Java NIO 核心组件 https://segmentfault.com/a/1190000017040893
    2)《Netty、Redis、Zookeeper高并发实战》第三章、第四章
    3)《Scalable IO in Java》(Doug Lea)http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
    4)Reacto模式以及Netty中的应用 https://zhuanlan.zhihu.com/p/152250231
    5)Netty源码分析 https://www.w3xue.com/exp/article/20191/15727.html
    6)Netty源码,4.1分支

    相关文章

      网友评论

          本文标题:Netty之Reactor模型

          本文链接:https://www.haomeiwen.com/subject/zovocltx.html