美文网首页Java 杂谈javaJava设计模式
第十二节 netty前传-NIO 实现reactor模式

第十二节 netty前传-NIO 实现reactor模式

作者: 勃列日涅夫 | 来源:发表于2018-12-24 20:54 被阅读1次

本节所要了解java nio的reactor模式,参考来源Doug lee java并发的作者。当然作为netty的底层实现,对于nio的reactor模式的实现,对于学习netty也是尤为重要的一步。

首先先作为对比先看下经典BIO模型

图片.png
  • 伪代码片段
class Server implements Runnable {
  public void run() {
    try {
//创建socket server套接字
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
//启动额外线程处理socket客户端连接后的业务处理
        new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
    } catch (IOException ex) { /* ... */ }
  }
//业务处理
  static class Handler implements Runnable {
    final Socket socket;
    Handler(Socket s) { socket = s; }
    public void run() {
      try {
byte[] input = new byte[MAX_INPUT];
        socket.getInputStream().read(input);
        byte[] output = process(input);
        socket.getOutputStream().write(output);
      } catch (IOException ex) { /* ... */ }
    }
    private byte[] process(byte[] cmd) { /* ... */ }
  }
}

nio不再详细介绍,可参考前面文章

主要说下reactor模式:简单来说reactor模式用于同时处理一个或多个传递给服务端的请求的事件的处理模式。 然后,服务端处理程序解析输入别的请求,并将它们同步分派给与之关联的请求异步处理程序。不恰当可类比web页面事件,当点击某个按钮时,浏览器收到这个信号(监听),分派给相关的js处理程序处理(handler)。

  • 注意两点 1、同步分派 2、异步处理程序
    而上面两点的分派由Dispatch承担,异步处理由handler处理。
    详细可参考reactor wiki介绍
    3、 reactor实现
  • 借用doug的图 直观的感受下:


    图片.png

关于Channels、Buffers、Selectors、SelectionKeys核心前面已介绍,下面实现会用到

基本代码讲解

  • Reactor类 作为nio 响应器模式的核心部分。承载了selector选择器、ServerSocketChannel 服务端的通道。三个重要的功能
  1. 对ServerSocketChannel、Selector初始化。 serverSocket.register方法将服务通道注册到选择器,并绑定ACCEPT兴趣事件(初始绑定,当客户端通道连接后会先处理accpet事件)

  2. 初始附加一个Acceptor对象(通过SelectionKey 的attach方法)。目的是用于分发时,借助Acceptor处理相关逻辑.

  3. dispatch方法的同步分发功能

  • Acceptor 类的作用其一是获取客户端与服务端的连接,其二是获取连接后调用handler处理(为了简化,handler使用状态模式来模拟其他事件,所以这里一旦有客户端连接,就会通过初始设置READING = 0 读事件)
class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        //注册ServerSocketChannel的兴趣事件为连接OP_ACCEPT
        SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
        //附加Acceptor,稍后调用attachment可以取得该对象
        sk.attach(new Acceptor());
    }
    public void run() {  //normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                //阻塞 直到有有一個通道返回
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //循环检测是否有新事件注册
                while (it.hasNext())
                    //同步分发
                    dispatch((SelectionKey)(it.next()));
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }
    void dispatch(SelectionKey k) {
        //取得attach附加的对象处理
        Runnable r = (Runnable)(k.attachment());
        if (r != null)
            r.run();
    }

    // class Reactor continued
class Acceptor implements Runnable {
        // inner
        public void run() {
            try {
          //接受到通道套接字的连接
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}
  • Handler 类 1、将自身绑定到选择器、并注册读事件( sk = socket.register( sel, 0); sk.attach(this);)2、 根据READING SENDING 状态判断事件
/**
 * handler用到状态模式,根据当前读写的状态分别处理
 */
final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(1024);
    ByteBuffer output = ByteBuffer.allocate(1024);
    static final int READING  = 0 ,SENDING = 1;
    int state = READING;
    Handler(Selector sel , SocketChannel c) throws IOException {
        socket = c;
        //设置通道为非阻塞
        c.configureBlocking(false);
// Optionally try first read now
        sk = socket.register( sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        //select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select  那么执行完wakeup后下一个执行select就会立即返回。
        sel.wakeup();
    }
    boolean inputIsComplete()  {/* 相关处理略... */ return true; }
    boolean outputIsComplete() { /*相关处理略 ... */return true; }
    void process(){}
    public void run() {
        try {
            if (state == READING)
                read();
            else if (state ==SENDING)
                send();
        } catch (IOException ex) { /* ... */ }
    }
    void read()throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            // 处理完成后设置发送状态
            state =SENDING;
            //注册写事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
    void send()throws IOException {
        socket.write(output);
        if (outputIsComplete())
            sk.cancel();
    }
}

参考:
github java设计模式
Doug Lea - java 并发包的作者讲解NIO

相关文章

  • 第十二节 netty前传-NIO 实现reactor模式

    本节所要了解java nio的reactor模式,参考来源Doug lee java并发的作者。当然作为netty...

  • Netty简介

    1.Netty是什么? Netty是高性能、异步事件驱动的非阻塞(NIO)Reactor模式的socket通信...

  • Netty浅析

    IO模型 IO多路复用模式:Reactor、Proactor NIO实现的是Reactor模式。通过select、...

  • netty网络模型

    nio是基于reactor模型来设计的,所以netty也是基于reactor模型的,reactor模型有以下三种 ...

  • Netty与Reactor 模式

    前言 Netty 的线程模型是基于NIO的Selector 构建的,使用了异步驱动的Reactor 模式来构建的线...

  • Netty NioEventLoop源码解读

    Netty NioEventLoop Reactor 模型 Netty实现并扩展了Reactor模型,为了更好的了...

  • Reactor模式总结

    Reactor是基于NIO中实现多路复用的一种模式. 什么是Reactor模式 同步的等待多个事件源到达(采用se...

  • Reactor反应式模式(事件驱动模式)

    一 reactor的前世今生 高性能IO绕不开Reactor模式,NIO是Java对Reactor模式的实践,而...

  • 自顶向下深入分析Netty(五)--Future

    再次回顾这幅图,在上一章中,我们分析了Reactor的完整实现。由于Java NIO事件驱动的模型,要求Netty...

  • Netty理论三:Netty线程模型

    1、Reactor模式:NIO网络框架的典型模式 Reactor是网络编程中的一种设计模式,reactor会解耦并...

网友评论

    本文标题:第十二节 netty前传-NIO 实现reactor模式

    本文链接:https://www.haomeiwen.com/subject/lhvpxqtx.html