美文网首页Java 杂谈Java IO
【译】Scalable IO in Java 可伸缩Java I

【译】Scalable IO in Java 可伸缩Java I

作者: 理查德成 | 来源:发表于2019-08-14 09:31 被阅读1次

原文地址: http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

作者信息:
Doug Lea State University of New York at Oswego
dl@cs.oswego.edu
http://gee.cs.oswego.edu

译者水平有限,如有错误和纰漏,欢迎指正

大纲

  • 可伸缩网络服务

  • 事件驱动处理

  • reactor模式

    • 基础版本
    • 多线程版本
    • 其他变种
  • java nio 非阻塞IO API一览

网络服务

web服务, 分布式对象, 等大多数网络服务具有相同的基本结构:

  1. Read request, 读请求
  2. Decode request, 解码请求
  3. Process service, 处理服务
  4. Encode reply, 编码响应
  5. Send reply, 发送响应

但是每个步骤的性质和成本又有所不同: xml解析, 文件传输, web页生成以及计算服务等...

经典服务设计
经典服务设计

每一个处理程序(上图中的handler)都拥有自己的线程

经典ServerSocket 循环
public class ClassicServerSocketLoop {

    private static final int PORT = 1992;
    private static final int MAX_INPUT = 1024;

    class Server implements Runnable {

        @Override
        public void run() {
            try (ServerSocket ss = new ServerSocket(PORT)) {
                while (!Thread.interrupted()) {
                    // 这里可使用单线程处理,或者线程池管理多个线程
                    new Thread(new Handler(ss.accept())).start();
                }
            } catch (IOException ignored) {
                // ignored
            }
        }
    }

    static class Handler implements Runnable {
        final Socket socket;

        Handler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ignored) {
                // ignored
            }
        }

        private byte[] process(byte[] input) {
            // 业务处理逻辑
            return new byte[0];
        }
    }
}
可伸缩目标
  • 负载不断增加时能优雅降级 (更多客户端接入,负载增加)

  • 资源增加时性能能够持续提升(CPU, 内存, IC盘, 带宽等资源)

  • 同样满足可用性以及性能目标:

    • 低延迟
    • 满足高峰需求
    • 可调节的服务质量
  • 分而治之(Divide and Conquer)通常是实现可伸缩目标的最有效方式

分而治之
  1. 将处理程序划分为小的任务, 每个小任务以非阻塞的方式执行.

  2. 当小任务可以执行时, 再执行任务. 通常, IO事件充当触发器的角色:

分而治之
  1. java nio支持的基本机制:

    • 非阻塞的读和写
    • 分发与IO事件关联的任务
  2. 无尽的变化可能

    • 一系列事件驱动设计

事件驱动设计

事件驱动设计通常比其他同类的可选设计更加有效:

  • 所需资源更少: 无需为每个客户端分配一个线程
  • 更少的开销: 更少的上下文切换, 更少的同步操作
  • 但是分发会更慢: 必须手动将事件和处理程序绑定

事件驱动设计在编码上更加复杂:

  • 必须将一个完整的任务切分为简单的非阻塞任务

    • 与GUI事件启动动作相似
    • 不能消除所有的阻塞, 比如: GC, 页错误等
  • 必须持续跟踪服务的逻辑状态

背景资料: AWT中的事件
AWT

事件驱动IO使用相同的思想, 但是设计方面有所不同

reactor模式
  1. 响应IO事件时, reactor将事件分发给合适的处理器处理——与AWT线程相似
  2. 处理器执行非阻塞操作——与AWT的ActionListeners相似
  3. 将事件处理器绑定到具体事件——与AWT的 addActionListener操作相似
  4. 参考 Schmidt et al, Pattern-Oriented Software Architecture, Volume 2 (POSA2), 或者Richard Stevens's的网络编程书籍, 以及Matt Welsh's的SEDA架构等.
基本reactor设计
reactor设计

以上是单线程版本

java nio支持
  1. 渠道channel: channel是文件, socket等的连接, 支持非阻塞读
  2. 缓冲buffer: buffer是数组一样的对象, 可直接被channel读写
  3. 选择器selector: select可监控注册在其上的channel集合IO事件的发生
  4. selectionKey: selectionKey维护IO事件的状态以及事件和处理器的绑定关系(原本的selection并不支持维护绑定关系的功能, 只是reactor模式利用selectionKey的attachment特性实现了这一功能而已)
reactor 1: 初始设置
class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }
    // 类未完
reactor 2: 分发循环
    // 继续类Reactor
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> sks = selector.selectedKeys();
                Iterator<SelectionKey> it = sks.iterator();
                while (it.hasNext()) {
                    dispatcher(it.next());
                }
                // 也可以在while循环中使用iterator的remove方法
                sks.clear();
            }
        } catch (IOException ignored) {
            // ignored
        }
    }

    private void dispatcher(SelectionKey sk) {
        Runnable r = (Runnable) sk.attachment();
        if (null != r) {
            r.run();
        }
    }
    // 类未完
reactor 3: Acceptor

Acceport也是处理器EventHandler的一种, 用于处理socket accept事件

    // 继续类Reactor
    class Acceptor implements Runnable {

        @Override
        public void run() {
            try {
                SocketChannel c = serverSocketChannel.accept();
                if (null != c) {
                    new Handler(selector, c);
                }
            } catch (IOException ignored) {
                // ignored
            }
        }
    }
}
// 类完成
reactor 4: handler设置
final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;

    static final int MAX_IN = 1024;
    static final int MAX_OUT = 1024;
    ByteBuffer input = ByteBuffer.allocate(MAX_IN);
    ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    public Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {
        // 加入实现
        return true;
    }

    boolean outputIsComplete() {
        // 加入实现
        return true;
    }

    void process() {
        // 加入实现
    }
    // 类未完
reactor 5: 请求处理
    // 继续类Handler
    @Override
    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                write();
            }
        } catch (IOException ignored) {
            // ignored.
        }
    }

    void read() throws IOException {
        socket.read(input);
        process();
        state = SENDING;
        sk.interestOps(SelectionKey.OP_WRITE);
    }

    void write() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            sk.cancel();
        }
    }
}
// 类完成
另一种handler实现

使用GoF设计模式, 状态模式: 再绑定合适的处理器作为selectionKey的attchment

final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;

    static final int MAX_IN = 1024;
    static final int MAX_OUT = 1024;
    ByteBuffer input = ByteBuffer.allocate(MAX_IN);
    ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    public Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {
        // 加入实现
        return true;
    }

    boolean outputIsComplete() {
        // 加入实现
        return true;
    }

    void process() {
        // 加入实现
    }

    @Override
    public void run() {
        try {
            socket.read(input);
            if (inputIsComplete()) {
                process();
                sk.attach(new Sender());
                sk.interestOps(SelectionKey.OP_WRITE);
                sk.selector().wakeup();
            }
        } catch (IOException ignored) {
            // ignored
        }
    }

    class Sender implements Runnable {

        @Override
        public void run() {
            try {
                socket.write(output);
                if (outputIsComplete()) {
                    sk.cancel();
                }
            } catch (IOException ignored) {
                // ignored.
            }
        }
    }
}
多线程设计
  1. 为可伸缩性考虑添加多线程: 主要适用于多处理器

  2. 工作线程

    • Reactor应该快速触发处理器: 处理器的处理过程减慢了reactor的速度
    • 将非IO处理放到其他线程中
  3. 多reactor线程

    • reactor线程可以只做饱和IO, 将业务负载分发给其他线程: 采用负载均衡匹配CPU和IO速率
工作线程
  1. 卸载非IO处理, 以此加速reactor线程: 类似于 POSA2 Proactor设计

  2. 比将计算密集型处理重构为事件驱动形式更为简单

    • 还应该是纯非阻塞计算, 足够的任务逻辑来抵消开销
  3. 但是与IO处理同时发生会更难

    • 当可以首先将所有数据读入一个buffer, 最好不过了
  4. 使用线程池, 便于调整和控制

    • 一般情况下所需的线程数少于客户端数量
工作线程池
工作线程池
使用线程池的handler
static class PoolHandler implements Runnable {

    final SocketChannel socket;
    final SelectionKey sk;

    static ExecutorService pool = Executors.newFixedThreadPool(100);
    static final int PROCESSING = 3;
    static final int MAX_IN = 1024;
    static final int MAX_OUT = 1024;
    ByteBuffer input = ByteBuffer.allocate(MAX_IN);
    ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    public PoolHandler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {
        // 加入实现
        return true;
    }

    boolean outputIsComplete() {
        // 加入实现
        return true;
    }

    void process() {
        // 加入实现
    }

    synchronized void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            // 使用线程池处理业务
            pool.execute(new Processor());
        }
    }

    synchronized void write() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            sk.cancel();
        }
    }

    synchronized void processAndHandOff() {
        process();
        // or rebind attachment
        state = SENDING;
        sk.interestOps(SelectionKey.OP_WRITE);
    }


    @Override
    public void run() {
        try {
            if (SENDING == state) {
                write();
            } else if (READING == state) {
                read();
            }
        } catch (IOException ignored) {
            // ignored.
        }
    }

    class Processor implements Runnable {
        @Override
        public void run() {
            processAndHandOff();
        }
    }
}
协调任务
  • 任务接力
    每个任务触发或者调用下一个任务. 这种方式通常快速, 但是也很脆弱.

  • 回调

  • 队列

  • Futures (java future ?)

使用 PooledExecutor
  • 可调节的的工作线程池

  • main方法 execute(Runnable r)

  • 控制:

    • 任务队列的类型 (任何channel)
    • 线程最大数量
    • 线程最小数量
    • "Warm" versus on-demand threads (不知道怎么翻译)
    • 空闲线程消亡的keep-alive时间间隔: 如有需要,后面可以更换新的
    • 饱和策略: 阻塞, drop, producer-runs 等
多个reactor线程

使用reactor线程池:

- 用来匹配CPU和IO速率
- 静态或者动态的构造方式: 每一个都拥有selector, 线程以及分发循环
- 主acceptor分发给其他reactor处理accept事件
Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ...
    public synchronized void run() { ...
        Socket connection = serverSocket.accept();
        if (connection != null)
            new Handler(selectors[next], connection);
        if (++next == selectors.length) next = 0;
    }
}

使用多个reactor:

使用多个reactor
使用其他java nio特性

一个reactor多个selector

  • 绑定不同的处理器到不同的IO事件
  • 需要认真仔细使用同步来协调多线程

文件传输

  • 文件到网络或网络到文件的自动复制

内存映射文件

  • 使用buffer访问文件

直接buffer

  • 有时可以实现零拷贝传输
  • 但是有启动和回收垃圾开销
  • 适用于长连接的应用
基于连接的扩展

非单个服务连接

  • 客户端连接
  • 客户端发送一系列请求/消息
  • 客户端断开连接

例子

  • 数据库和事务监控
  • 多个参与者的游戏, 聊天服务等

可扩展基本的网络服务模式

  • 处理许多相对长连接的客户端
  • 跟踪客户端session状态
  • 分发跨域主机服务

API一览

  • Buffer
  • ByteBuffer/CharBuffer/LongBuffer等
  • Channel
  • SelectableChannel
  • SocketChannel
  • ServerSocketChannel
  • FileChannel
  • Selector
  • SelectionKey

Buffer

abstract class Buffer {
    int capacity();
    int position();
    Buffer position(int newPosition);
    int limit();
    Buffer limit(int newLimit);
    Buffer mark();
    Buffer reset();
    Buffer clear();
    Buffer flip();
    Buffer rewind();
    int remaining();
    boolean hasRemaining();
    boolean isReadOnly();
}
buffer

ByteBuffer

abstract class ByteBuffer extends Buffer {
    static ByteBuffer allocateDirect(int capacity);
    static ByteBuffer allocate(int capacity);
    static ByteBuffer wrap(byte[] src, int offset, int len);
    static ByteBuffer wrap(byte[] src);
    boolean isDirect();
    ByteOrder order();
    ByteBuffer order(ByteOrder bo);
    ByteBuffer slice();
    ByteBuffer duplicate();
    ByteBuffer compact();
    ByteBuffer asReadOnlyBuffer();
    byte get();
    byte get(int index);
    ByteBuffer get(byte[] dst, int offset, int length);
    ByteBuffer get(byte[] dst);
    ByteBuffer put(byte b);
    ByteBuffer put(int index, byte b);
    ByteBuffer put(byte[] src, int offset, int length);
    ByteBuffer put(ByteBuffer src);
    ByteBuffer put(byte[] src);
    char getChar();
    char getChar(int index);
    ByteBuffer putChar(char value);
    ByteBuffer putChar(int index, char value);
    CharBuffer asCharBuffer();
    short getShort();
    short getShort(int index);
    ByteBuffer putShort(short value);
    ByteBuffer putShort(int index, short value);
    ShortBuffer asShortBuffer();
    int getInt();
    int getInt(int index);
    ByteBuffer putInt(int value);
    ByteBuffer putInt(int index, int value);
    IntBuffer asIntBuffer();
    long getLong();
    long getLong(int index);
    ByteBuffer putLong(long value);
    ByteBuffer putLong(int index, long value);
    LongBuffer asLongBuffer();
    float getFloat();
    float getFloat(int index);
    ByteBuffer putFloat(float value);
    ByteBuffer putFloat(int index, float value);
    FloatBuffer asFloatBuffer();
    double getDouble();
    double getDouble(int index);
    ByteBuffer putDouble(double value);
    ByteBuffer putDouble(int index, double value);
    DoubleBuffer asDoubleBuffer();
}

Channel

interface Channel {
    boolean isOpen();
    void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
    int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
    int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
    int read(ByteBuffer[] dsts, int offset, int length) throws IOException;
    int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
    int write(ByteBuffer[] srcs, int offset, int length) throws IOException;
    int write(ByteBuffer[] srcs) throws IOException;
}

SelectableChannel

abstract class SelectableChannel implements Channel {
    int validOps();
    boolean isRegistered();
    SelectionKey keyFor(Selector sel);
    SelectionKey register(Selector sel, int ops)
    throws ClosedChannelException;
    void configureBlocking(boolean block)
    throws IOException;
    boolean isBlocking();
    Object blockingLock();
}

SocketChannel

abstract class SocketChannel implements ByteChannel ... {
    static SocketChannel open() throws IOException;
    Socket socket();
    int validOps();
    boolean isConnected();
    boolean isConnectionPending();
    boolean isInputOpen();
    boolean isOutputOpen();
    boolean connect(SocketAddress remote) throws IOException;
    boolean finishConnect() throws IOException;
    void shutdownInput() throws IOException;
    void shutdownOutput() throws IOException;
    int read(ByteBuffer dst) throws IOException;
    int read(ByteBuffer[] dsts, int offset, int length)
    throws IOException;
    int read(ByteBuffer[] dsts) throws IOException;
    int write(ByteBuffer src) throws IOException;
    int write(ByteBuffer[] srcs, int offset, int length)
    throws IOException;
    int write(ByteBuffer[] srcs) throws IOException;
}

ServerSocketChannel

abstract class ServerSocketChannel extends ... {
    static ServerSocketChannel open() throws IOException;
    int validOps();
    ServerSocket socket();
    SocketChannel accept() throws IOException;
}

FileChannel

abstract class FileChannel implements ... {
    int read(ByteBuffer dst);
    int read(ByteBuffer dst, long position);
    int read(ByteBuffer[] dsts, int offset, int length);
    int read(ByteBuffer[] dsts);
    int write(ByteBuffer src);
    int write(ByteBuffer src, long position);
    int write(ByteBuffer[] srcs, int offset, int length);
    int write(ByteBuffer[] srcs);
    long position();
    void position(long newPosition);
    long size();
    void truncate(long size);
    void force(boolean flushMetaDataToo);
    int transferTo(long position, int count,
    WritableByteChannel dst);
    int transferFrom(ReadableByteChannel src,
    long position, int count);
    FileLock lock(long position, long size, boolean shared);
    FileLock lock();
    FileLock tryLock(long pos, long size, boolean shared);
    FileLock tryLock();
    static final int MAP_RO, MAP_RW, MAP_COW;
    MappedByteBuffer map(int mode, long position, int size);
}
// NOTE: ALL methods throw IOException

Selector

abstract class Selector {
    static Selector open() throws IOException;
    Set keys();
    Set selectedKeys();
    int selectNow() throws IOException;
    int select(long timeout) throws IOException;
    int select() throws IOException;
    void wakeup();
    void close() throws IOException;
}

SelectionKey

abstract class SelectionKey {
    static final int OP_READ, OP_WRITE,
    OP_CONNECT, OP_ACCEPT;
    SelectableChannel channel();
    Selector selector();
    boolean isValid();
    void cancel();
    int interestOps();
    void interestOps(int ops);
    int readyOps();
    boolean isReadable();
    boolean isWritable();
    boolean isConnectable();
    boolean isAcceptable();
    Object attach(Object ob);
    Object attachment();
}

相关文章

网友评论

    本文标题:【译】Scalable IO in Java 可伸缩Java I

    本文链接:https://www.haomeiwen.com/subject/tdpwjctx.html