美文网首页
Java网络编程:Netty框架学习(三)---Reactor反

Java网络编程:Netty框架学习(三)---Reactor反

作者: singleZhang2010 | 来源:发表于2020-12-04 17:18 被阅读0次

概述

Reactor反应器模式在高性能网络编程中非常重要,高性能web服务器Nginx、高性能通信框架Netty都是基于反应器模式的。
反应器模式是高性能网络编程的必知、必会的模式。

Reactor 线程模型

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成

  • Reactor 反应器
    负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中选择器监控的通道IO事件
  • Handlers 处理器
    与IO事件(或者选择键)绑定,负责IO事件的处理。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等

单线程Reactor反应器

单线程Reactor反应器模式,即Reactor反应器和Handers处理器处于一个线程中执行。它是最简单的反应器模式,处理图如下


单线程Reactor反应器模式

在反应器模式中,需要用到SelectionKey中的attach和attachment方法,attach和attachment结合使用:在选择键注册完成之后,调用attach方法,将Handler处理器绑定到选择键;当事件发生时,调用attachment方法,可以从选择键取出Handler处理器,将事件分发到Handler处理器中,完成业务处理。
通过示例加深一下印象,创建ReactorDemo.java

package com.zhxin.nettylab.reactor.chapter1;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @ClassName ReactorDemo
 * @Description //单线程Reactor反应器模式 Demo
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/4 0004 下午 3:34
 **/
public class ReactorDemo implements Runnable{

    private static Selector selector;
    private static ServerSocketChannel serverSocketChannel;

    ReactorDemo() throws IOException {
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open(); //打开ServerSocketChannel,获取通道
        serverSocketChannel.configureBlocking(false); //设为非阻塞

        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8989)); //将该通道对应的serverSocket绑定到port端口
        SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//将通道注册到选择器上,监听"接收连接"事件

        sk.attach(new AcceptHandler());
    }

    public void run(){
        try{
            // 选择器轮询
            while (! Thread.interrupted()){
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while(keyIterator.hasNext()){
                    SelectionKey key = keyIterator.next();
                    dispatch(key);
                    selectionKeys.clear();
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 反应器的分发
     * */
    private void dispatch(SelectionKey key){
        Runnable rKey = (Runnable) key.attachment();
        if(rKey != null){
            rKey.run();
        }
    }

    /**
     * 新连接 处理器
     * */
    class AcceptHandler implements Runnable{

        public void run(){
            System.out.println("开始 新连接 处理");
            try{
                //接受新连接
                SocketChannel socket = serverSocketChannel.accept();
                if(socket != null){
                    // 为新连接创建一个输入输出的Handler处理器
                    new IOHandler(selector,socket);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
}

创建IOHandler.java

package com.zhxin.nettylab.reactor.chapter1;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * @ClassName IOHandler
 * @Description //输入输出处理器
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/4 0004 下午 4:05
 **/
public class IOHandler implements Runnable {
    private final static int MAX_IN = 1024;
    private final static int MAX_OUT = 1024;
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAX_IN);
    ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    IOHandler(Selector sel, SocketChannel c)
            throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ return true;}
    boolean outputIsComplete() { /* ... */ return true;}
    void process() { /* ... */ }

    public void run(){
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }
}

上边代码中的AcceptorHandler处理器为内部类,在注册serverSocket服务监听连接的接受事件之后,创建一个AcceptorHandler新连接处理器的实例,作为附件,被设置(attach)到了SelectionKey中,它主要作用一是接受新连接,二是在为新连接创建一个输入输出的Handler处理器
当监听到新连接事件发生后,会通过选择器轮询事件分发,取出了之前attach到SelectionKey中的Handler业务处理器,进行socket的各种IO处理,为读写操作创建IOHandler。
IOHandler,顾名思义,就是负责socket的数据输入、业务处理、结果输出。
※代码参考:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

小结

在高性能服务器应用场景中,单线程反应器模式实际使用的很少,这里做个了解,熟悉一下场景即可。

多线程的Reactor反应器模式

多线程Reactor反应器的演进,分为两个方面:

  1. 首先是升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以考虑使用线程池
  2. 其次是升级Reactor反应器。可以考虑引入多个Selector选择器,提升选择大量通道的能力。

多线程Reactor反应器模式,通过示例代码加深一下印象,创建MultiThreadEchoServerReactor.java

package com.zhxin.nettylab.reactor.chapter2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName MultiThreadEchoServerReactor
 * @Description // 多线程Reactor反应器 demo
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/4 0004 下午 4:25
 **/
public class MultiThreadEchoServerReactor {
    private ServerSocketChannel serverSocket;
    private AtomicInteger next = new AtomicInteger(0);
    //selectors集合,引入多个selector选择器
    Selector[] selectors = new Selector[2];
    //引入多个子反应器
    private SubReactor[] subReactors = null;

    private MultiThreadEchoServerReactor() throws IOException {

        //初始化多个选择器
        selectors[0] = Selector.open();
        selectors[1] = Selector.open();

        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(8989);
        serverSocket.socket().bind(address);
        serverSocket.configureBlocking(false);//非阻塞

        //让第一个选择器负责监听 新连接事件
        SelectionKey sk = serverSocket.register(selectors[0],SelectionKey.OP_ACCEPT);

        //为选择键绑定handler
        sk.attach(new AcceptHandler());

        //一个子选择器负责一个反应器
        SubReactor subReactor1 = new SubReactor(selectors[0]);
        SubReactor subReactor2 = new SubReactor(selectors[1]);
        subReactors = new SubReactor[]{subReactor1,subReactor2};
    }

    private void startService(){
        //一个子选择器 对应一个线程
        new Thread(subReactors[0]);
        new Thread(subReactors[1]);
    }

    /**
     * 新连接 处理器
     * */
    class AcceptHandler implements Runnable{

        public void run(){
            try{
                //接受新连接,进行业务处理
                SocketChannel socket = serverSocket.accept();
                if(socket != null){
                    // MultiThreadEchoHandler
                    new MultiThreadEchoHandler(selectors[next.get()],socket);
                }
                if(next.incrementAndGet() == selectors.length){
                    next.set(0);
                }
            }catch (Exception e){
                /*...*/
            }
        }
    }

    /**
     * 子反应器
     * */
    static class SubReactor implements Runnable{
        //每子选择器对应的线程负责一个选择器的查询和选择
        final Selector selector;

        SubReactor(Selector selector){
            this.selector = selector;
        }

        public void run(){
           try{
               while (!Thread.interrupted()){
                   selector.select();
                   Set<SelectionKey> selectionKeys = selector.selectedKeys();
                   Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                   while(keyIterator.hasNext()){
                       SelectionKey key = keyIterator.next();
                       dispatch(key);
                   }
                   selectionKeys.clear();
               }
           }catch (Exception e){
               /* ... */
           }
        }

        /**
         * 反应器的分发
         * */
        private void dispatch(SelectionKey key){
            //调用之前绑定的Handler
            Runnable rKey = (Runnable) key.attachment();
            if(rKey != null){
                rKey.run();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor();
        server.startService();
    }
}

创建业务处理器MultiThreadEchoHandler.java

package com.zhxin.nettylab.reactor.chapter2;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName MultiThreadEchoHandler
 * @Description //MultiThreadEchoHandler 新连接接收后的业务处理器
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/4 0004 下午 4:52
 **/
public class MultiThreadEchoHandler implements Runnable {

    private final SocketChannel channel;
    private final SelectionKey sk;
    private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    private static final int RECIEVING = 0, SENDING = 1;
    private int state = RECIEVING;

    //引入线程池
    private static ExecutorService pool = Executors.newFixedThreadPool(4);

    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);

        //将本Handler作为sk选择键的附件,方便事件dispatch

        sk.attach(this);
        //向sk选择键注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);

        selector.wakeup();
    }
    public void run(){
        //异步任务,在独立的线程池中执行
        pool.execute(new AsyncTask());
    }

    //异步任务,不在Reactor线程中执行
    private synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);
                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                   System.out.println(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();

                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);

                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    //异步任务的内部类
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }
}

创建客户端MultiEchoClient.java

package com.zhxin.nettylab.reactor.chapter2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * @ClassName MultiEchoClient
 * @Description //多线程Reactor反应器 demo 客户端
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/4 0004 下午 5:07
 **/
public class MultiEchoClient {

    public void start() throws IOException {

        InetSocketAddress address =
                new InetSocketAddress("localhost", 8989);


        SocketChannel socketChannel = SocketChannel.open(address);//获取通道(channel)
        socketChannel.configureBlocking(false); //切换成非阻塞模式
        //不断的自旋、等待连接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {

        }
       System.out.println("客户端启动成功!");

        //启动接受线程
        Processer processer = new Processer(socketChannel);
        new Thread(processer).start();

    }

    static class Processer implements Runnable {
        final Selector selector;
        final SocketChannel channel;

        Processer(SocketChannel channel) throws IOException {
            //Reactor初始化
            selector = Selector.open();
            this.channel = channel;
            channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        SelectionKey sk = it.next();
                        if (sk.isWritable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(1024);

                            Scanner scanner = new Scanner(System.in);
                            System.out.println("请输入发送内容:");
                            if (scanner.hasNext()) {
                                SocketChannel socketChannel = (SocketChannel) sk.channel();
                                String next = scanner.next();
                                buffer.put(("now time:"+System.currentTimeMillis() + " >>" + next).getBytes());
                                buffer.flip();
                                // 操作三:通过DatagramChannel数据报通道发送数据
                                socketChannel.write(buffer);
                                buffer.clear();
                            }

                        }
                        if (sk.isReadable()) {
                            // 若选择键的IO事件是“可读”事件,读取数据
                            SocketChannel socketChannel = (SocketChannel) sk.channel();

                            //读取数据
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int length = 0;
                            while ((length = socketChannel.read(byteBuffer)) > 0) {
                                byteBuffer.flip();
                                System.out.println("server echo:" + new String(byteBuffer.array(), 0, length));
                                byteBuffer.clear();
                            }

                        }
                        //处理结束了, 这里不能关闭select key,需要重复使用
                        //selectionKey.cancel();
                    }
                    selected.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new MultiEchoClient().start();
    }
}

好了,以上就是多线程Reactor反应器模式 示例demo代码。

总结

反应器模式和生产者消费者模式对比
相似之处:在一定程度上,反应器模式有点类似生产者消费者模式。在生产者消费者模式中,一个或多个生产者将事件加入到一个队列中,一个或多个消费者主动地从这个队列中提取(Pull)事件来处理。不同之处在于:反应器模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler处理器来处理。
反应器模式和观察者模式(Observer Pattern)对比
相似之处在于:在反应器模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步地分发这些IO事件。观察者模式(ObserverPattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时,会通知所有观察者,它们能够执行相应的处理。不同之处在于:在反应器模式中,Handler处理器实例和IO事件(选择键)的订阅关系,基本上是一个事件绑定到一个Handler处理器;每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler处理器;而在观察者模式中,同一个时刻,同一个主题可以被订阅过的多个观察者处理。
反应器模式的优点如下:

  • 响应快,虽然同一反应器线程本身是同步的,但不会被单个连接的同步IO所阻塞;
  • 编程相对简单,最大程度避免了复杂的多线程同步,也避免了多线程的各个进程之间切换的开销;
  • 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。

示例代码地址

https://gitee.com/kaixinshow/java-nionetty-learning

相关文章

  • Java网络编程:Netty框架学习(三)---Reactor反

    概述 Reactor反应器模式在高性能网络编程中非常重要,高性能web服务器Nginx、高性能通信框架Netty都...

  • 彻底搞懂Netty高性能之零拷贝

    作为Java网络编程学习者,不仅要知道NIO,还一定要学习Mina和Netty这两个优秀的网络框架。 Netty高...

  • 提升能力从学习Netty开始

    netty 介绍 一、 Netty 是什么 Netty 是一个广泛使用的 Java 网络编程框架而Netty就是基...

  • Netty4(十五): 优化

    BossGroup 线程池优化 Netty 是基于 Reactor 模型实现的网络框架,但 Netty 并没有实现...

  • netty与springboot的整合

    netty框架 在网络编程领域,Netty是Java的一个优秀的框架,他将java的复杂和难以使用的关于OIO和N...

  • Netty之HelloWorld

    现在Java网络编程框架这块基本已经被Netty垄断了,几乎所有框架底层的RPC通信都是使用Netty来实现,Ne...

  • Netty4.x Internal Logger 机制

    Netty是一个简化Java NIO编程的网络框架。就像人要吃饭一样,框架也要打日志。Netty不像大多数框架,默...

  • java-netty

    netty常用API学习 netty简介 Netty是基于Java NIO的网络应用框架. Netty是一个NIO...

  • Java NIO

    书本 Netty权威指南netty实战O’Reilly的《Java nio》Unix网络编程 《unix网络编程》...

  • Netty理论三:Netty线程模型

    1、Reactor模式:NIO网络框架的典型模式 Reactor是网络编程中的一种设计模式,reactor会解耦并...

网友评论

      本文标题:Java网络编程:Netty框架学习(三)---Reactor反

      本文链接:https://www.haomeiwen.com/subject/ehbbwktx.html