美文网首页Netty框架源码分析
5.Netty框架-网络IO编程模板(NIO实现Reactor线

5.Netty框架-网络IO编程模板(NIO实现Reactor线

作者: 还算年轻 | 来源:发表于2020-08-07 09:35 被阅读0次

    一、单线程Reactor线程模型

    1.单线程Reactor线程模型:新连接的接入、数据的读写都是用一个线程:
       public class SingleReactor implements Runnable{
    
       //单线程reactor 模型:1.一个线程绑定一个selector 和一个serverSocketChannel 
       ServerSocketChannel serverSocketChannel;
       
       Selector selector;
       
       //OPEN&注册accepet
       public void open() throws IOException{
           selector = Selector.open();
           serverSocketChannel = ServerSocketChannel.open();
           //设置非阻塞
           serverSocketChannel.configureBlocking(false);
           //绑定ip&端口
           serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));
           //将channel注册到selector并拿到选择键(channel注册的标识)
           SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
           register.attach(new AcceptHandle());
       }
    
       //轮训注册的事件&分发
       @Override
       public void run() {
           try {
               open();
           } catch (IOException e1) {
                e1.printStackTrace();
           }
           try {
               //循环查询感兴趣的时间
               while (!Thread.interrupted()) {
                   try {
                       //1.阻塞去查
                       selector.select();
                       //2.拿到查询结果(注册的标识)
                       Set<SelectionKey> keys = selector.selectedKeys();
                       //3.迭代感兴趣的key
                       Iterator<SelectionKey> iterator = keys.iterator();
                       while (iterator.hasNext()) {
                           SelectionKey next = iterator.next();
                           dispatch(next);
                       }
                       keys.clear();
                   } catch (IOException e) {
                       e.printStackTrace();
                   }
               }
           } catch (Exception e) {
               e.printStackTrace();
           }
       }   
       
       //分发&拿到绑定的处理器
       void dispatch(SelectionKey next){
           //1.拿出SelectionKey
           Runnable attachment = (Runnable) next.attachment();
           //2.调用handle处理器
           if (attachment != null) {
               attachment.run();
           }   
       }
       
       
       //接受连接处理器
       //连接处理器&为新连接创造一个输入输出的Handle处理器
       class AcceptHandle implements Runnable{
           @Override
           public void run() {
               try {
                   //1.接受新连接&调用下一个处理器注册读取事件
                   SocketChannel accept = serverSocketChannel.accept();
                   new IOEchoHandler(selector,accept);
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }   
       }
       
       //处理具体的IO时间处理器
       class IOEchoHandler implements Runnable{
    
           SocketChannel channel;//读写channel
           SelectionKey register;//channel注册结果返回的标识
           
           static final int RECIEVING = 0, SENDING = 1;
           int state = RECIEVING;
           
           //读写数据的缓存区
           final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
              
           IOEchoHandler(Selector selector, SocketChannel c) throws IOException {
               channel = c;
               c.configureBlocking(false);//设置阻塞
               //仅仅取得选择键,后设置感兴趣的IO事件
               register = channel.register(selector, 0);
               //将Handler作为选择键的附件
               register.attach(this);
               //第二步,注册Read就绪事件
               register.interestOps(SelectionKey.OP_READ);
               selector.wakeup();
           }
            
           
           @Override
           public void run() {
               try {
                   if (state == SENDING) {
                       //写入通道
                       channel.write(byteBuffer);
                       //写完后,准备开始从通道读,byteBuffer切换成写模式
                       byteBuffer.clear();
                       //写完后,注册read就绪事件
                       register.interestOps(SelectionKey.OP_READ);
                       //写完后,进入接收的状态
                       state = RECIEVING;
                   } else if (state == RECIEVING) {//一开始是接受情况
                       //从通道读到byteBuffer
                       int length = 0;
                       while ((length = channel.read(byteBuffer)) > 0) {
                           System.out.println(new String(byteBuffer.array(), 0, length));
                       }
                       //读完后,准备开始写入通道,byteBuffer切换成读模式
                       byteBuffer.flip();
                       //读完后,注册write就绪事件
                       register.interestOps(SelectionKey.OP_WRITE);
                       //读完后,进入发送的状态
                       state = SENDING;
                   }
                   //处理结束了, 这里不能关闭select key,需要重复使用
                   //sk.cancel();
               } catch (IOException ex) {
                   ex.printStackTrace();
               }
           }
       }
       
       
       public static void main(String[] args) throws IOException {
           new Thread(new SingleReactor()).start();
       }
    }
    

    二、多线程Reactor线程模型

    1.多线程Reactor线程模型:一个线程用于新连接的接入、数据的读写用一个线程池:
    class MultiThreadEchoHandler implements Runnable {
        final SocketChannel channel;
        final SelectionKey sk;
        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        static final int RECIEVING = 0, SENDING = 1;
        int state = RECIEVING;
        //引入线程池
        static ExecutorService pool = Executors.newFixedThreadPool(4);
    
        MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
            channel = c;
            c.configureBlocking(false);
            //仅仅取得选择键,后设置感兴趣的IO事件
            sk = channel.register(selector, 0);
            //将本Handler作为sk选择键的附件,方便事件dispatch
            sk.attach(this);
            //向sk选择键注册Read就绪事件
            sk.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        //线程处理handle
        public void run() {
            //异步任务,在独立的线程池中执行
            pool.execute(new AsyncTask());
        }
    
        //异步任务,不在Reactor线程中执行
        public synchronized void asyncRun() {
            try {
                if (state == SENDING) {
                    //写入通道
                    channel.write(byteBuffer);
                    //写完后,准备开始从通道读,byteBuffer切换成写模式
                    byteBuffer.clear();
                    //写完后,注册read就绪事件
                    sk.interestOps(SelectionKey.OP_READ);
                    //写完后,进入接收的状态
                    state = RECIEVING;
                } else if (state == RECIEVING) {
                    //从通道读
                    int length = 0;
                    while ((length = channel.read(byteBuffer)) > 0) {
                       System.out.println(new String(byteBuffer.array(), 0, length));
                    }
                    //读完后,准备开始写入通道,byteBuffer切换成读模式
                    byteBuffer.flip();
                    //读完后,注册write就绪事件
                    sk.interestOps(SelectionKey.OP_WRITE);
                    //读完后,进入发送的状态
                    state = SENDING;
                }
                //处理结束了, 这里不能关闭select key,需要重复使用
                //sk.cancel();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    
        //异步任务的内部类
        class AsyncTask implements Runnable {
            public void run() {
                MultiThreadEchoHandler.this.asyncRun();
            }
        }
    
    }
    

    三、主从线程Reactor线程模型

    1.主从线程Reactor线程模型:一个连接池用于新连接的接入、数据的读写用另一个线程池:
    class MultiThreadEchoServerReactor {
        
        
        ServerSocketChannel serverSocket;
        AtomicInteger next = new AtomicInteger(0);
        //selectors集合,引入多个selector选择器
        Selector[] selectors = new Selector[2];
        //引入多个子反应器
        SubReactor[] subReactors = null;
    
    
        MultiThreadEchoServerReactor() throws IOException {
            //初始化多个selector选择器
            selectors[0] = Selector.open();
            selectors[1] = Selector.open();
            serverSocket = ServerSocketChannel.open();
    
            InetSocketAddress address =
                    new InetSocketAddress("127.0.0.1",8080);
            serverSocket.socket().bind(address);
            //非阻塞
            serverSocket.configureBlocking(false);
    
            //第一个selector,负责监控新连接事件
            SelectionKey sk = serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
            //附加新连接处理handler处理器到SelectionKey(选择键)
            sk.attach(new AcceptorHandler());
    
            //构建两个反应器
            //第一个子反应器,一子反应器负责一个选择器
            SubReactor subReactor1 = new SubReactor(selectors[0]);
            //第二个子反应器,一子反应器负责一个选择器
            SubReactor subReactor2 = new SubReactor(selectors[1]);
            subReactors = new SubReactor[]{subReactor1, subReactor2};
        }
    
        //开启两个反应器线程
        private void startService() {
            // 一子反应器对应一条线程
            new Thread(subReactors[0]).start();
            new Thread(subReactors[1]).start();
        }
    
        //反应器&分发
        class SubReactor implements Runnable {
            //每条线程负责一个选择器的查询
            final Selector selector;
    
            public SubReactor(Selector selector) {
                this.selector = selector;
            }
       
            public void run() {
                try {
                    while (!Thread.interrupted()) {
                        selector.select();
                        Set<SelectionKey> keySet = selector.selectedKeys();
                        Iterator<SelectionKey> it = keySet.iterator();
                        while (it.hasNext()) {
                            //Reactor负责dispatch收到的事件
                            SelectionKey sk = it.next();
                            dispatch(sk);
                        }
                        keySet.clear();
                    }
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
    
    
            void dispatch(SelectionKey sk) {
                Runnable handler = (Runnable) sk.attachment();
                //调用之前attach绑定到选择键的handler处理器对象
                if (handler != null) {
                    handler.run();
                }
            }
        }
    
    
        // Handler:新连接处理器
        class AcceptorHandler implements Runnable {
            public void run() {
                try {
                    SocketChannel channel = serverSocket.accept();
                    if (channel != null)
                        new MultiThreadEchoHandler(selectors[next.get()], channel);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (next.incrementAndGet() == selectors.length) {
                    next.set(0);
                }
            }
        }
    
    
        public static void main(String[] args) throws IOException {
            MultiThreadEchoServerReactor server =
                    new MultiThreadEchoServerReactor();
            server.startService();
        }
    
    }
    

    相关文章

      网友评论

        本文标题:5.Netty框架-网络IO编程模板(NIO实现Reactor线

        本文链接:https://www.haomeiwen.com/subject/ojsyrktx.html