美文网首页
2021-08-14_Nio多线程模型时序图分析

2021-08-14_Nio多线程模型时序图分析

作者: kikop | 来源:发表于2021-08-14 16:57 被阅读0次

    20210814_Nio多线程模型时序图分析

    1概述

    1.6关于Nio多线程的时序图

    1.6.1服务端初始化

    从左到右,依次为:Main-->MyBossHaneler-->MyRefactor

    [图片上传失败...(image-9715d-1628931332173)]


    image-20210814164326402.png

    1.6.2MyBossHandler

    1.6.3MyReactorTaskRunnable

    1.6.3.1轮询Boss事件1

    [图片上传失败...(image-e5bc3c-1628931332173)]


    image-20210814164506381.png

    1.6.3.2轮询Work事件2

    [图片上传失败...(image-e58fbc-1628931332173)]


    image-20210814164528239.png

    1.6.4MyWorkHandler

    [图片上传失败...(image-ef36df-1628931332173)]


    image-20210814164647998.png

    2代码实战(多线程版本)

    2.1maven依赖

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>${netty.version}</version>
    </dependency>
    

    2.2配置

    2.3服务端

    2.3.1MultiThreadEchoServerReactor

    package com.kikop.myreactor.multipthreadapp.server;
    
    
    import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
    import com.kikop.myreactor.multipthreadapp.server.accept.MyBossHandler;
    import com.kikop.myreactor.multipthreadapp.server.refactor.MyReactorTaskRunnable;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: myrefactor_multipthreadapp
     * @file Name: EchoServerReactor
     * @desc 多线程反应器(一个Runnable任务)
     * @date 2021/6/22
     * @time 9:30
     * @by IDE: IntelliJ IDEA
     */
    public class MultiThreadEchoServerReactor {
    
        private ServerSocketChannel serverSocketChannel;
    
        // 2个 selector 选择器
        private Selector[] selectors = new Selector[2];
    
        // 2个子反应器线程,boss,work
        private MyReactorTaskRunnable[] myReactorTaskRunnables = null;
    
        public MultiThreadEchoServerReactor() throws IOException {
    
            // 1.初始化多个selector选择器
            selectors[0] = Selector.open(); //new WindowsSelectorProvider
            selectors[1] = Selector.open();
    
            // 2.创建 serverSocketChannel
            serverSocketChannel = ServerSocketChannel.open();
    
            // 2.1.开启服务端监听
            InetSocketAddress address =
                    new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                            NioDemoConfig.SOCKET_SERVER_PORT);
            serverSocketChannel.socket().bind(address);
    
            // 2.2.设置非阻塞
            serverSocketChannel.configureBlocking(false);
    
            // 3.创建选择键
    
            // 3.1.构建SelectionKey
            // 通过第一个 selector,负责监控新连接OP_ACCEPT事件
            // 对应通道:serverSocket
            SelectionKey sk = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT);
            // 3.1.创建附件参数
            MyBossHandler attachObject = new MyBossHandler(selectors, serverSocketChannel);
    
            // 3.2.绑定附件参数:MyBossHandler
            // 附加新连接处理 MyBossHandler 处理器到 SelectionKey(选择键)
            sk.attach(attachObject);
    
            // 4.构建连个反应器线程
            // 4.1.第一个子反应器,一子反应器负责一个选择器
            MyReactorTaskRunnable subReactor1 = new MyReactorTaskRunnable(selectors[0]);
            // 4.2.第二个子反应器,一子反应器负责一个选择器
            MyReactorTaskRunnable subReactor2 = new MyReactorTaskRunnable(selectors[1]);
            myReactorTaskRunnables = new MyReactorTaskRunnable[]{subReactor1, subReactor2};
        }
    
        private void startService() {
    
            System.out.println("服务端开始启动...");
            // 2个线程,不断轮询、监听
            new Thread(myReactorTaskRunnables[0]).start(); // as server.boss,负责:OP_ACCEPT和事件分发
            new Thread(myReactorTaskRunnables[1]).start(); // as server.work,负载IO事件读写节和业务逻辑
            System.out.println("服务端启动成功!");
        }
    
    
        public static void main(String[] args) throws IOException {
    
            MultiThreadEchoServerReactor server =
                    new MultiThreadEchoServerReactor();
    
            server.startService();
        }
    
    }
    

    2.3.2MyBossHandler(accept连接建立,构建MyWorkHandler)

    package com.kikop.myreactor.multipthreadapp.server.accept;
    
    
    import com.kikop.myreactor.multipthreadapp.server.handler.MyWorkHandler;
    
    import java.io.IOException;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettydemo
     * @file Name: BossHandler
     * @desc Boss连接处理器, 等同 于netty:bossNioEventLoop
     * boosGroup用于Accetpt连接建立事件
     * @date 2021/6/22
     * @time 10:30
     * @by IDE: IntelliJ IDEA
     */
    public class BossHandler
            implements Runnable {
    
        AtomicInteger next = new AtomicInteger(0);
    
        private Selector[] selectors;
        private ServerSocketChannel serverSocket;
    
        public BossHandler(Selector[] selectors, ServerSocketChannel serverSocket) {
            this.selectors = selectors;
            this.serverSocket = serverSocket;
        }
    
        /**
         * 事件分发 BossHandler 对应的处理逻辑
         * 构建 MyWorkHandler,如果是多线程,则会将此时的 SocketChannel 注册到另外一个 selector
         */
        public void run() {
            try {
    
                SocketChannel channel = serverSocket.accept();
                if (channel != null) {
    
                    // 将 channel 动态分配到 selector,通道IO事件就绪时,
                    // 由 selector 进行分发(MyReactorTaskRunnable不断轮询监听,调用对应的 handlerXXX.run)
    
                    // 顺序选择,每次连接对应的选择器
                    // 第一个 selector连接压力比较大:负责连接监听 ServerSocketChannel +业务处理 SocketChannel
                    // 第二个 selector 连接压力比较小:只负责业务处理 SocketChannel
                    new MyWorkHandler(selectors[next.get()], channel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (next.incrementAndGet() == selectors.length) { // 只能是:0,1
                next.set(0);
            }
        }
    }
    

    2.3.3MyReactorTaskRunnable(所有事件分发线程)

    处理附件类型如下:

    如果是 ServerSocketChannel(代表服务端),则 handler为 BossHandler
    如果是 SocketChannel(代表连接的某个客户端网络通道),则 handler为 MyWorkHandler

    package com.kikop.myreactor.multipthreadapp.server.handler;
    
    import java.io.IOException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.util.Iterator;
    import java.util.Set;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettydemo
     * @file Name: MyReactorTask
     * @desc boss,work统一的线程,不同的是通道绑定的附件不一样
     * @date 2021/6/22
     * @time 10:30
     * @by IDE: IntelliJ IDEA
     */
    public class MyReactorTaskRunnable implements Runnable {
    
        // 每个线程负责一个选择器的查询
        final Selector selector;
    
        public MyReactorTaskRunnable(Selector selector) {
            this.selector = selector;
        }
    
        public void run() {
            try {
    
                while (!Thread.interrupted()) { // 非阻塞等待
    
                    // 负载某个 seleector 等待就绪事件
                    selector.select();
    
                    // 有读写,获取待就绪事件
                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = keySet.iterator();
    
                    while (it.hasNext()) {
                        // Reactor负责 dispatch收到的事件
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    keySet.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    
        /**
         * 获得就绪IO事件并完成事件分发
         *
         * @param sk
         */
        void dispatch(SelectionKey sk) {
    
            // 1.获取 handler
            // 1.1.如果是 ServerSocketChannel(代表服务端),则 handler为 BossHandler
            // 1.2.如果是 SocketChannel(代表连接的某个客户端网络通道),则 handler为 MyWorkHandler
            Runnable handler = (Runnable) sk.attachment();
    
            // 在当前线程中处理,调用之前 attach 绑定到选择键的handler处理器对象
            if (handler != null) {
                System.out.println("dispatch:"+handler.getClass().toString());
                handler.run();
            }
        }
    }
    

    2.3.4MyWorkHandler

    package com.kikop.myreactor.multipthreadapp.server.handler;
    
    
    import com.kikop.util.Logger;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettydemo
     * @file Name: MyWorkHandler
     * @desc 执行业务处理逻辑(在一个主线程main开辟的线程池中)
     * 负责单个通道的同步sync读写
     * @date 2021/6/22
     * @time 10:30
     * @by IDE: IntelliJ IDEA
     */
    public class MyWorkHandler implements Runnable {
    
        final SocketChannel channel;
        final SelectionKey sk;
    
        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    
    
        static final int RECIEVING = 0, SENDING = 1;
    
        int state = RECIEVING;
    
        // 主线程 main:引入线程池
        static ExecutorService pool = Executors.newFixedThreadPool(4);
    
        /**
         * @param selector 可能是1或2
         * @param c
         * @throws IOException
         */
        public MyWorkHandler(Selector selector, SocketChannel c) throws IOException {
    
            channel = c;
            c.configureBlocking(false);
    
            // 取得选择键,设置感兴趣的IO事件
            sk = channel.register(selector, 0);
    
            // 将本 Handler:MultiThreadEchoHandler作为 MyReactorTaskRunnable.dispatch sk选择键的附件
            // 方便事件 dispatch
            sk.attach(this);
    
            // 向 SelectionKey 选择键注册Read就绪事件
            sk.interestOps(SelectionKey.OP_READ);
    
            // 使尚未返回的第一个选择操作立即返回,唤醒的
            // 原因是:注册了新的channel或者事件;channel关闭,取消注册;优先级更高的事件触发(如定时器事件),希望及时处理。
            selector.wakeup();
        }
    
        // MyReactorTaskRunnable.dispatch来触发
        public void run() {
            // 异步任务,在独立的线程池中执行
            pool.execute(new AysncPoolTask(this));
        }
    
        /**
         * synchronized 感觉没有意义 todo
         */
        public synchronized void process() {
            try {
                if (state == SENDING) {
                    //写入通道
                    channel.write(byteBuffer);
                    //写完后,准备开始从通道读,byteBuffer切换成写模式
                    byteBuffer.clear();
                    //写完后,注册read就绪事件
                    sk.interestOps(SelectionKey.OP_READ);
                    //写完后,进入接收的状态
                    state = RECIEVING;
                } else if (state == RECIEVING) {
                    // 从通道读
                    int length = 0;
                    // 注意:channel.read 定义1024,kernel 内核有个缓存
                    while ((length = channel.read(byteBuffer)) > 0) {
                        Logger.info(new String(byteBuffer.array(), 0, length));
                    }
                    //读完后,准备开始写入通道,byteBuffer切换成读模式
                    byteBuffer.flip();
                    //读完后,注册 write就绪事件
    
            
                    sk.interestOps(SelectionKey.OP_WRITE);
                    // 读完后,进入发送的状态
                    state = SENDING;
                }
                //处理结束了, 这里不能关闭select key,需要重复使用
                //sk.cancel();
    
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    
    
    }
    

    2.3.4.1AysncPoolTask

    package com.kikop.myreactor.multipthreadapp.server.handler;
    
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettydemo
     * @file Name: AysncPoolTask
     * @desc 线程程中的某个任务
     * @date 2021/6/22
     * @time 10:30
     * @by IDE: IntelliJ IDEA
     */
    public class AysncPoolTask implements Runnable {
    
        MyWorkHandler workHandler;
    
        public AysncPoolTask(MyWorkHandler workHandler) {
            this.workHandler = workHandler;
        }
    
        public void run() {
            this.workHandler.process();
        }
    }
    

    2.4客户端

    2.4.1EchoClient

    package com.kikop.myreactor.multipthreadapp.client;
    
    
    import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
    import com.kikop.myreactor.singlethreadapp.client.handler.MyTaskRunnable;
    import com.kikop.util.Print;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SocketChannel;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettydemo
     * @file Name: EchoClient
     * @desc EchoClient
     * @date 2021/6/22
     * @time 10:30
     * @by IDE: IntelliJ IDEA
     */
    public class EchoClient {
    
        public void start() throws IOException {
    
            InetSocketAddress address =
                    new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT);
    
            // 1、获取网络通道
            SocketChannel socketChannel = SocketChannel.open(address);
    
            // 2、切换成非阻塞模式
            socketChannel.configureBlocking(false);
    
            // 不断的自旋、等待与服务端的连接完成,或者做一些其他的事情
            while (!socketChannel.finishConnect()) {
    
            }
            Print.tcfo("客户端启动成功!");
    
            // 3.启动业务处理线程
            MyTaskRunnable myTaskRunnable = new MyTaskRunnable(socketChannel);
            new Thread(myTaskRunnable).start();
    
        }
    
        public static void main(String[] args) throws IOException {
            new EchoClient().start();
        }
    }
    

    2.4.2MyTaskRunnable

    package com.kikop.myreactor.multipthreadapp.client.handler;
    
    import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
    import com.kikop.util.Dateutil;
    import com.kikop.util.Logger;
    import com.kikop.util.Print;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    /**
     * @author kikop
     * @version 1.0
     * @project Name: mynettydemo
     * @file Name: MyTaskRunnable
     * @desc 客户端业务处理线程
     * @date 2021/6/22
     * @time 10:30
     * @by IDE: IntelliJ IDEA
     */
    public class MyTaskRunnable implements Runnable {
    
        // 客户端选择器(提供通道的IO读、写事件的注册)
        private final Selector selector;
        private final SocketChannel channel;
    
        /**
         * MyTaskRunnable
         *
         * @param channel
         * @throws IOException
         */
        public MyTaskRunnable(SocketChannel channel) throws IOException {
            selector = Selector.open(); // Reactor初始化
            this.channel = channel;
    
            channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }
    
        public void run() {
    
            try {
    
                while (!Thread.interrupted()) {
    
                    selector.select();
                    // 获取就绪事件列表,底层用两个数组进行筛选,Buffer有读写事件,操作系统产生中断
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
    
                    while (it.hasNext()) {
                        SelectionKey sk = it.next();
                        if (sk.isWritable()) { // 通道对应的缓存可写
                            ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
    
                            Scanner scanner = new Scanner(System.in);
                            Print.tcfo("请输入发送内容:");
    
                            if (scanner.hasNext()) {
    
                                SocketChannel socketChannel = (SocketChannel) sk.channel();
                                String next = scanner.next();
    
                                // 先写数据到buffer
                                buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
    
                                // buffer切为读
                                buffer.flip();
    
                                // 发送数据
                                // 通过 DatagramChannel数据报通道
                                socketChannel.write(buffer);
    
                                // 清空缓存
                                buffer.clear();
                            }
    
                        }
                        if (sk.isReadable()) { // 通道对应的缓存可读
                            // 若选择键的IO事件是“可读”事件,读取数据
                            SocketChannel socketChannel = (SocketChannel) sk.channel();
    
                            //读取数据
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    
                            int length = 0;
                            // 注意:socketChannel.read需要等到一定的数量
                            while ((length = socketChannel.read(byteBuffer)) > 0) {
    
                                byteBuffer.flip(); // 方便下一次写数据到buffer
                                Logger.info("client recv:" + new String(byteBuffer.array(), 0, length));
                                // 业务处理完成,清空缓存
                                byteBuffer.clear();
                            }
                        }
                        // 处理结束了, 这里不能关闭 select key,需要重复使用
                        // selectionKey.cancel();
                    }
    
                    selected.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
    

    总结

    1.1优化演进

    模式演进分为2个方面:
    1)升级Refactor反应器。引入多个selector选择器,提升选择大量通道的能力。

    2)升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以使用线程池。

    总体如下:
    1)将负责输入输出处理的IO Handler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。

    2)如果服务器为多核CPU,可以将反应器线程拆分为多个子反应器线程;同时,引入多个选择器,每一个反应器线程负责一个选择器,这样,充分释放系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。

    参考

    1大规模分布式系统架构与设计实战(彭渊)

    http://code.google.com/p/fourinone

    https://github.com/fourinone/fourinone

    2 fourinone分布式协调设计解析

    https://blog.51cto.com/3503265/1058623

    3Netty面试题(2021 最新版)

    https://blog.csdn.net/qq_17231297/article/details/117324371

    相关文章

      网友评论

          本文标题:2021-08-14_Nio多线程模型时序图分析

          本文链接:https://www.haomeiwen.com/subject/ovdebltx.html