美文网首页
Pipe 学习

Pipe 学习

作者: Astimegoes | 来源:发表于2019-08-26 19:49 被阅读0次

    A pair of channels that implements a unidirectional pipe.

    A pipe consists of a pair of channels:
    A writable Pipe.SinkChannel
    A readable Pipe.SourceChannel
    Once some bytes are written to the sink channel they can be read from the source channel in exactly the
    order in which they were written.

    Pipe类上的注释,大致意思就是Pipe包含一对 channel:
    一个只可写的SinkChannel,一个只可读的SourceChannel,写入SinkChannel的byte可以从SourceChannel中按照写入顺序读出。

    Pipe 主要方法

    //获取 SourceChannel 
    public abstract SourceChannel source();
    
    //获取 SinkChannel
    public abstract SinkChannel sink();
    
    //打开一个新的Pipe
    public static Pipe open() throws IOException;
    

    SinkChannel声明

    
    public abstract static class SinkChannel extends AbstractSelectableChannel implements WritableByteChannel, GatheringByteChannel
    
    

    AbstractSelectableChannel 可以 register 到一个 Selector上
    WritableByteChannel 可以写入
    GatheringByteChannel 支持gathering, 我理解就是可以一次写入多个ByteBuffer

    SourceChannel声明

    
    public abstract static class SourceChannel extends AbstractSelectableChannel implements ReadableByteChannel, ScatteringByteChannel
    
    

    AbstractSelectableChannel 可以 register 到一个 Selector上
    ReadableByteChannel 可以读取
    GatheringByteChannel 支持gathering, 我理解就是可以一次读入多个ByteBuffer

    一个简单的列子

    Charset charset = Charset.forName("utf-8");
    
    Pipe pipe = Pipe.open();
    Pipe.SinkChannel sinkChannel = pipe.sink();
    Pipe.SourceChannel sourceChannel = pipe.source();
    
    //写入的buffer
    byte[] bytes = "message".getBytes(charset);
    ByteBuffer writeBuffer = ByteBuffer.wrap(bytes);
    
    //读入的buffer
    ByteBuffer readBuffer = ByteBuffer.allocate(bytes.length);
    
    sinkChannel.write(writeBuffer);
    sourceChannel.read(readBuffer);
    
    //读模式
    readBuffer.flip();
    
    System.out.println(charset.decode(readBuffer).toString());
    

    设置阻塞、非阻塞

    
    //设置 非阻塞 且 sinkChannel 无写入
    sourceChannel.configureBlocking(false);
    System.out.println("read ..");
    sourceChannel.read(readBuffer);
    // 输出 read end ..0
    System.out.println("read end .." + readBuffer.flip().remaining());
    
    readBuffer.clear();
    
    // 设置 阻塞 且 sinkChannel 无写入
    sourceChannel.configureBlocking(true);
    System.out.println("read ..");
    sourceChannel.read(readBuffer);
    // 上一句阻塞,不会输出
    System.out.println("read end ..");
    

    和Selector一起

    Selector selector = Selector.open();
    sourceChannel.configureBlocking(false);     //默认是阻塞,不设置会报错
    sourceChannel.register(selector, SelectionKey.OP_READ);
    

    Pipe 在 OpenJDK 10 下的实现 PipeImpl

    
    /**
     * A simple Pipe implementation based on a socket connection.
     * (基于socket实现)
     */
    
    class PipeImpl extends Pipe
    {
        ...
        // 生成握手的随机 bytes
        private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();
    
        // SourceChannel  SinkChannel  
        private SourceChannel source;
        private SinkChannel sink;
    
        PipeImpl(final SelectorProvider sp) throws IOException {
            AccessController.doPrivileged(new Initializer(sp));
        }
        public SourceChannel source() {
            return source;
        }
        public SinkChannel sink() {
            return sink;
        }
    
        private class Initializer implements PrivilegedExceptionAction<Void> {
    
            @Override
            public Void run() throws IOException {
                LoopbackConnector connector = new LoopbackConnector();
                connector.run();
            }
    
            private class LoopbackConnector implements Runnable {
    
                @Override
                public void run() {
                    //sc1、sc2 连接建立使用,完成后会关闭
                    ServerSocketChannel ssc = null;
    
                    //SourceChannel 中使用,读取
                    SocketChannel sc1 = null;
    
                    // SinkChannel  中使用,写入
                    SocketChannel sc2 = null;
    
                    try {
                        // 握手用的两个ByteBuffer
                        ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
                        ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);
    
                        // 本机地址
                        InetAddress lb = InetAddress.getByName("127.0.0.1");
                       
                        InetSocketAddress sa = null;
                        for(;;) {    
                            if (ssc == null || !ssc.isOpen()) {
                                //打开ServerSocketChannel等待连接
                                ssc = ServerSocketChannel.open();
                                ssc.socket().bind(new InetSocketAddress(lb, 0));
                                sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                            }
    
                            // 建立连接
                            sc1 = SocketChannel.open(sa);
    
                            //生成随机握手信息
                            RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
                            //发送
                            do {
                                sc1.write(secret);
                            } while (secret.hasRemaining());
                            secret.rewind();
    
                            // 获取连接
                            sc2 = ssc.accept();
                            do {
                                //读取握手信息
                                sc2.read(bb);
                                //依据ByteBuffer长度保证不会少读
                            } while (bb.hasRemaining());
                            bb.rewind();
                            
                            if (bb.equals(secret))
                                //握手成功,跳出循环
                                break;
    
                            sc2.close();
                            sc1.close();
                        }
    
                        source = new SourceChannelImpl(sp, sc1);
                        sink = new SinkChannelImpl(sp, sc2);
                    } catch (IOException e) {
                        ...
                    } finally {
                        ...
                    }
                }
            }
        }
    }
    

    SinkChannelImpl

    class SourceChannelImpl extends Pipe.SourceChannel implements SelChImpl{
      ...
      public int read(ByteBuffer dst) throws IOException {
            try {
                return sc.read(dst);
            } catch (AsynchronousCloseException x) {
                close();
                throw x;
            }
        }
      ...
    }
    

    SourceChannelImpl 基本也这样。



    P.P.S. 水平有限,如有错误请指正。

    相关文章

      网友评论

          本文标题:Pipe 学习

          本文链接:https://www.haomeiwen.com/subject/ftklfctx.html