美文网首页
Reactor Pattern

Reactor Pattern

作者: anyoptional | 来源:发表于2021-05-31 00:57 被阅读0次

    前言

    之前聊过几种常见的 I/O 模型,不过要说起当红炸子鸡还得是 I/O 多路复用,Java 1.4 引入的 nio package 提供了对这一模式的支持,著名开源框架 Netty 更是这一领域的扛鼎之作。

    Netty is an asynchronous event-driven network application framework for rapid
    development of maintainable high performance protocol servers & clients.
    

    Netty 官网介绍其是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。这里异步暂且不说,事件驱动可是大有文章,今天就聊聊这个吧。

    Event Driven

    一个典型的网络服务通常都有着如下基础结构:

    1. read request

    2. decode request

    3. process service

    4. encode reply

    5. send reply

    细究起来,每一步的性质和成本各不相同,这种不均衡会极大的影响吞吐量,分治策略通常是解决这一类型问题的最佳方案。我们可以把这其中的每一步都拆分成一个任务,每个任务都以非阻塞的方式运行。这里 I/O 事件充当了触发器的角色,想想前端火的一塌糊涂的响应式编程,不正是这样吗?

    Reactor Pattern

    基于这个设想,就诞生了 Reactor Pattern。Reactor Pattern 首先是事件驱动的,它有两个主要角色:

    1. Reactor,响应 I/O 事件并分发给对应的 Handler

    2. Handlers,非阻塞地执行相关操作

    从简单到复杂,Reactor Pattern 也有很多版本:单线程、多线程以及主从结构。纯理论总是看得人头疼,我们就以一个 echo server 为例,一点点升级它来说明 Reactor Pattern 的各种版本好了。所谓 echo server,就是客户端发什么服务端就回什么,这里我们约定以#字符标识一条完整的消息。

    单线程
    single-thread-reactor.png

    如上图所示,在单线程版本中, Reactor 和 Handlers 共享一个线程,因此连接的建立、数据的读取和处理以及发送响应均在同一个线程中完成。这个版本的好处是编码简单,弊端也是显而易见的,一旦某个环节是一个长时间执行的任务,所有的操作都会被阻塞;并且,单线程也无法发挥多核 CPU 的优势。

    @Slf4j
    public class SingleThreadedReactor implements Runnable {
    
        private final int port;
    
        public SingleThreadedReactor(int port) {
            this.port = port;
        }
    
        @Override
        public void run() {
            Selector sel = null;
            ServerSocketChannel ssc = null;
            try {
                // 开启Server端通道
                ssc = ServerSocketChannel.open();
                // 配置非阻塞
                ssc.configureBlocking(false);
                // 注册一个Acceptor来处理连接
                sel = Selector.open();
                // Acceptor作为attachment进行挂载
                ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(sel, ssc));
                // 绑定端口
                ssc.bind(new InetSocketAddress(port));
                log.info("Server启动成功,正在监听[{}]端口", port);
                // 轮询
                iAmListening(sel);
            } catch (IOException ex) {
                log.error("Server启动失败", ex);
            } finally {
                    // close ssc/sel omitted...
            }
        }
    
        private void iAmListening(Selector sel) {
            // 轮询
            while (!Thread.interrupted()) {
                try {
                    int num = sel.select();
                    // 有事件则处理
                    if (num > 0) {
                        Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
                        // 逐个处理
                        while (iter.hasNext()) {
                            SelectionKey sk = iter.next();
                            // 派发事件
                            dispatch(sk);
                            // 处理过即移除,避免重复处理
                            iter.remove();
                        }
                    }
                } catch (IOException ex) {
                    log.error("监听失败", ex);
                }
            }
        }
    
        private void dispatch(SelectionKey sk) {
            // 事件处理程序都作为attachment挂载
            // 在SelectionKey上,并且包装成Runnable
            Object attachment = sk.attachment();
            if (attachment instanceof Runnable) {
                ((Runnable) attachment).run();
            }
        }
    }
    

    SingleThreadedReactor#run()由两大块构成,一是配置服务端的ServerSocketChannel,这一步的核心是挂载Acceptor(一个专门用来处理客户端接入的 Handler );二是轮询 I/O 事件,一旦有事件需要处理就进行派发。

    @Slf4j
    public class Acceptor implements Runnable {
    
        private final Selector sel;
    
        private final ServerSocketChannel ssc;
    
        public Acceptor(Selector sel, ServerSocketChannel ssc) {
            this.sel = sel;
            this.ssc = ssc;
        }
    
        @Override
        public void run() {
            try {
                // 获取与客户端通信的SocketChannel
                SocketChannel sc = ssc.accept();
                log.info("检测到新的客户端连接: {}", sc.getRemoteAddress());
                // Handler同样是一个Runnable,用来
                // 处理与客户端之间的 I/O 操作
                new Handler(sc, sel);
            } catch (IOException ex) {
                log.error("处理连接失败", ex);
            }
        }
    }
    

    Acceptor的职责是处理客户端的接入,一旦有新的客户端加入就建立起通道并将其注册到 Selector 上(通过 Handler 的构造函数)。

    @Slf4j
    public class Handler implements Runnable {
    
        enum State {
            READING,
            SENDING
        }
    
        private final SocketChannel sc;
    
        private final SelectionKey sk;
    
        // 初始状态为可读
        private State state = State.READING;
    
        private final ByteBuffer input = ByteBuffer.allocate(256);
        private final ByteBuffer output = ByteBuffer.allocate(256);
    
        public Handler(SocketChannel sc, Selector sel) {
            this.sc = sc;
            try {
                // 配置成非阻塞模式
                sc.configureBlocking(false);
                // 将自己也注册到Selector上,如此即可供Reactor派发
                this.sk = sc.register(sel, SelectionKey.OP_READ, this);
                // 注册完毕,让select()调用即刻返回,看看此时有没有事件要处理
                sel.wakeup();
            } catch (IOException ex) {
                throw new RuntimeException("Unable to handler event", ex);
            }
        }
    
        @Override
        public void run() {
            try {
                // 根据状态做处理
                if (state == State.READING) {
                    read();
                } else {
                    write();
                }
            } catch (IOException ex) {
                throw new RuntimeException("Unable to handler event", ex);
            }
        }
    
        private void read() throws IOException {
            log.info("正在读取客户端上送的信息");
            // 读取数据
            int oldPos = input.position();
            sc.read(input);
            int newPos = input.position();
            // 以 # 作为结束符,表示一条完整的消息
            for (int i = oldPos; i < newPos; i++) {
                byte b = input.get(i);
                // 已读到一条消息,进行处理
                if (b == '#') {
                    process(i);
                    log.info("处理完毕,准备发送响应信息");
                    state = State.SENDING;
                    sk.interestOps(SelectionKey.OP_WRITE);
                }
            }
        }
    
        private void write() throws IOException {
            output.flip();
            sc.write(output);
            output.clear();
            state = State.READING;
            sk.interestOps(SelectionKey.OP_READ);
        }
    
        private void process(int index) throws IOException {
            byte[] bytes = new byte[index + 1];
            input.flip();
            input.get(bytes);
            input.compact();
            // # 删掉
            String resp = new String(bytes, StandardCharsets.UTF_8)
                    .replace("#", "");
            log.info("接受到客户端[{}]的信息: {}", sc.getRemoteAddress(), resp);
            // 附加一点内容
            output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
            // 其它信息原样返回
            output.put(resp.getBytes(StandardCharsets.UTF_8));
        }
    }
    

    Handler负责处理 read -> decode -> process -> encode -> send 这一系列流程。在我们的例子里,只要没有读到#字符,就会一直读下去,直到客户端发送#字符告诉服务端一条完整的消息已经发送完了,服务端就着手进行处理并发送响应消息。

    多线程
    multi-thread-reactor.png

      多线程版本对 Handler 进行了优化,通过增加 worker threads 来处理非 I/O 操作,如此一来即使有耗时操作 Handler 也不会阻塞住 Reactor 了;同时多个 worker threads 也能更好地发挥多核 CPU 的优势。

    @Slf4j
    public class Handler implements Runnable {
    
        enum State {
            READING,
            SENDING
        }
    
        private final SocketChannel sc;
    
        private final SelectionKey sk;
    
        // 初始状态为可读
        private State state = State.READING;
    
        private final ByteBuffer input = ByteBuffer.allocate(256);
        private final ByteBuffer output = ByteBuffer.allocate(256);
    
        // 线程池,用来处理所有入站事件
        private final ExecutorService executor = new ThreadPoolExecutor(2,
                8,
                5, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(100),
                new ThreadPoolExecutor.CallerRunsPolicy());
    
        public Handler(SocketChannel sc, Selector sel) {
            this.sc = sc;
            try {
                // 配置成非阻塞模式
                sc.configureBlocking(false);
                // 将自己也注册到Selector上,如此即可供Reactor派发
                this.sk = sc.register(sel, SelectionKey.OP_READ, this);
                // 注册完毕,让select()调用即刻返回,看看此时有没有事件要处理
                sel.wakeup();
            } catch (IOException ex) {
                throw new RuntimeException("Unable to handler event", ex);
            }
        }
    
        @Override
        public void run() {
            try {
                // 根据状态做处理
                if (state == State.READING) {
                    // 读取数据
                    int oldPos = read();
                    // 提交到线程池,异步处理
                    executor.execute(() -> {
                        log.info("当前处理的线程: {}", Thread.currentThread().getName());
                        try {
                            process(oldPos);
                        } catch (IOException ex) {
                            throw new RuntimeException("Unable to read data", ex);
                        }
                    });
                } else {
                    // decode -> process -> encode
                    // 以上流程已经在 state == State.READING 时处理好了,此时直接发送就可以了,不用异步
                    write();
                }
            } catch (IOException ex) {
                throw new RuntimeException("Unable to handler event", ex);
            }
        }
    
        private synchronized int read() throws IOException {
            log.info("正在读取客户端上送的信息");
            // 读取数据
            int oldPos = input.position();
            sc.read(input);
            return oldPos;
        }
    
        private synchronized void write() throws IOException {
            log.info("正在给客户端返回响应信息");
            output.flip();
            sc.write(output);
            output.clear();
            state = State.READING;
            sk.interestOps(SelectionKey.OP_READ);
            sk.selector().wakeup();
        }
    
        private synchronized void process(int oldPos) throws IOException {
            // 计算读取的长度
            int newPos = input.position();
            // 以 # 作为结束符,表示一条完整的消息
            for (int i = oldPos; i < newPos; i++) {
                byte b = input.get(i);
                // 已读到一条消息,进行处理
                if (b == '#') {
                    byte[] bytes = new byte[i + 1];
                    input.flip();
                    input.get(bytes);
                    input.compact();
                    // # 删掉
                    String resp = new String(bytes, StandardCharsets.UTF_8)
                            .replace("#", "");
                    log.info("接受到客户端[{}]的信息: {}", sc.getRemoteAddress(), resp);
                    // 附加一点内容
                    output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
                    // 其它信息原样返回
                    output.put(resp.getBytes(StandardCharsets.UTF_8));
                    log.info("处理完毕,准备发送响应信息");
                    state = State.SENDING;
                    sk.interestOps(SelectionKey.OP_WRITE);
                    sk.selector().wakeup();
                }
            }
        }
    }
    

    多线程版本的ReactorAcceptor和单线程版本是一样的,区别只在 Handler 中引入了线程池,这些 worker threads 包办了流程中的 decode -> process -> encode。

    主从结构
    master-slave.png

      主从结构引入了多个 Reactor,将客户端接入和 socket 读、写进行了解耦:MainReactor 负责处理客户端的接入,而 SubReactor(s) 则负责在发生读、写事件时进行数据处理。

    @Slf4j
    public class MainReactor implements Runnable {
    
        private final int port;
    
        // 子Reactor数量
        private final int numberOfSubReactors;
    
        public MainReactor(int port, int numberOfSubReactors) {
            this.port = port;
            this.numberOfSubReactors = numberOfSubReactors;
        }
    
        @Override
        public void run() {
            Selector sel = null;
            ServerSocketChannel ssc = null;
            try {
                // 开启Server端通道
                ssc = ServerSocketChannel.open();
                // 配置非阻塞
                ssc.configureBlocking(false);
                // 注册一个Acceptor来处理连接
                sel = Selector.open();
                // Acceptor作为attachment进行挂载
                ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(ssc, numberOfSubReactors));
                // 绑定端口
                ssc.bind(new InetSocketAddress(port));
                log.info("Server启动成功,正在监听[{}]端口", port);
                // 轮询
                iAmListening(sel);
            } catch (IOException ex) {
                log.error("Server启动失败", ex);
            } finally {
                // close ssc/sel omitted...
            }
        }
    
        private void iAmListening(Selector sel) {
            // 轮询
            while (!Thread.interrupted()) {
                try {
                    int num = sel.select();
                    // 有事件则处理
                    if (num > 0) {
                        Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
                        // 逐个处理
                        while (iter.hasNext()) {
                            SelectionKey sk = iter.next();
                            // 派发事件
                            dispatch(sk);
                            // 处理过即移除,避免重复处理
                            iter.remove();
                        }
                    }
                } catch (IOException ex) {
                    log.error("监听失败", ex);
                }
            }
        }
    
        private void dispatch(SelectionKey sk) {
            // 事件处理程序都作为attachment挂载
            // 在SelectionKey上,并且包装成Runnable
            Object attachment = sk.attachment();
            if (attachment instanceof Runnable) {
                ((Runnable) attachment).run();
            }
        }
    }
    

    MainReactor#run()和多线程版本的大体相同,区别主要在Acceptor

    @Slf4j
    public class Acceptor implements Runnable {
    
        private int next = 0;
    
        private final Selector[] selectors;
    
        private final ServerSocketChannel ssc;
    
        public Acceptor(ServerSocketChannel ssc, int numberOfSubReactors) throws IOException {
            this.ssc = ssc;
            // 每个SubReactor持有一个Selector,互不干扰,避免了潜在的线程同步需求
            Selector[] selectors = new Selector[numberOfSubReactors];
            SubReactor[] subReactors = new SubReactor[numberOfSubReactors];
            for (int i = 0; i < numberOfSubReactors; i++) {
                Selector sel = Selector.open();
                selectors[i] = sel;
                subReactors[i] = new SubReactor(sel);
                // 启动一个独立的线程处理SubReactor
                // SubReactor不处理连接请求,只处理OP_READ/OP_WRITE
                Thread thread = new Thread(subReactors[i]);
                thread.setName("SubReactor-thread-" + i);
                thread.start();
            }
            this.selectors = selectors;
        }
    
        @Override
        public void run() {
            try {
                // 获取与客户端通信的SocketChannel
                SocketChannel sc = ssc.accept();
                log.info("检测到新的客户端连接: {},当前线程: {}", sc.getRemoteAddress(), Thread.currentThread().getName());
                // 通过简单的轮询算法,给每一个SocketChannel分配一个Selector
                // Selector关联了SubReactor,而SubReactor又关联了一个独立的
                // 线程,这样每个SocketChannel上的发生的事件就有对应的线程来处理了
                new Handler(sc, selectors[next]);
                if (++next == selectors.length) next = 0;
            } catch (IOException ex) {
                log.error("处理连接失败", ex);
            }
        }
    }
    

    主从结构的Acceptor就略微有点复杂了,它的主要工作有二:其一是初始化多个 SubReactor,每个 SubReactor 都绑定一个 Selector,并且运行在独立的线程中;其二在建立新的通道时,将其分配给某一个 SubReactor。这样做的好处在于分摊了负载,更好地均衡了 CPU 和 I/O 速率。

    @Slf4j
    public class SubReactor implements Runnable {
    
        private final Selector sel;
    
        public SubReactor(Selector sel) {
            this.sel = sel;
        }
    
        @Override
        public void run() {
            // 轮询
            while (!Thread.interrupted()) {
                try {
                    int num = sel.select(1000);
                    // 有事件则处理
                    if (num > 0) {
                        Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
                        // 逐个处理
                        while (iter.hasNext()) {
                            SelectionKey sk = iter.next();
                            // 派发事件
                            dispatch(sk);
                            // 处理过即移除,避免重复处理
                            iter.remove();
                        }
                    }
                } catch (IOException ex) {
                    log.error("监听失败", ex);
                }
            }
        }
    
        private void dispatch(SelectionKey sk) {
            if (!sk.isValid()) return;
            String ops = "";
            if (sk.isReadable()) {
                ops = "有新数据可读";
            }
            if (sk.isWritable()) {
                if (ops.length() != 0) {
                    ops = ops + ","+ "有数据可写";
                } else {
                    ops = "有数据可写";
                }
            }
            log.info("检测到新的事件: {},当前线程: {}", ops, Thread.currentThread().getName());
            // 事件处理程序都作为attachment挂载
            // 在SelectionKey上,并且包装成Runnable
            Object attachment = sk.attachment();
            if (attachment instanceof Runnable) {
                ((Runnable) attachment).run();
            }
        }
    }
    

    SubReactor 和 MainReactor 类似,它也维护了一个事件循环,用来派发 socket 读、写事件。Handler 和多线程版本保持一致,同样使用了线程池来处理非 I/O 任务,完整的示例看这里

    Netty 对 Reactor Pattern 的支持

    Netty 对 Reactor Pattern 的几个变种都有支持,简单配置一下 ServerBootstrap 即可实现。

    Version Implementation
    单线程 NioEventLoopGroup group = new NioEventLoopGroup(1);
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(group);
    多线程 NioEventLoopGroup group = new NioEventLoopGroup(3);
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(group);
    主从结构 NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup(4);
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup);

    参考

    Doug Lea: Scalable IO in Java

    相关文章

      网友评论

          本文标题:Reactor Pattern

          本文链接:https://www.haomeiwen.com/subject/jnrvsltx.html