美文网首页java.io源码解析
java.io源码解析(五)--管道字节流(PipedInput

java.io源码解析(五)--管道字节流(PipedInput

作者: dark丶Night | 来源:发表于2019-05-31 10:31 被阅读0次

声明:本系列只供本人自学使用,勿喷。

参考:https://www.cnblogs.com/skywang12345/p/io_04.html

管道流一般用于线程间的通讯。

大致流程:在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据,就可以实现,线程A和线程B的通信。

一、PipedInputStream

public class PipedInputStream extends InputStream{
    // 循环缓存数组,默认1024
    protected byte buffer[];
    // PipedOutputStream往PipedInputStream的buffer中写入数据的下一个位置
    protected int in = -1;
    // PipedInputStream从buffer中读数据的下一个位置
    protected int out = 0;
}
  • 构造器
    // 使得 PipedOutputStream与该PipedInputStream建立连接
    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
         initPipe(pipeSize);
         connect(src);
    }

    // 构造未建立连接的PipedInputStream
    public PipedInputStream()
    public PipedInputStream(int pipeSize)

-核心方法

    // 建立连接
    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this);
    }

    // 是PipedOutputStream的write(int b)方法中调用的,使得PipedInputStream能receive
    protected synchronized void receive(int b) throws IOException{
        1.检查连接状态,checkStateForReceive();
        2.假如写入的所有数据已全部读完(即in==out),awaitSpace();
          将新写入的数据添加到buffer中
    }
    
    // 若 “写入管道” 已将 “读取管道” 的缓存buffer写满,则需要执行awaitSpace()操作
    // 唤醒“读取管道”的线程进行读取(读完即可清空buffer继续写入)
    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    // 接收byte[]数组
    // 在PipedOutputStream的write(byte b[], int off, int len)方法中调用,使得PipedInputStream能receive
    synchronized void receive(byte b[], int off, int len)  throws IOException{
        checkStateForReceive();
        writeSide = Thread.currentThread();//获取写入线程
        int bytesToTransfer = len;//PipedOutputStream写入的字节总数
        while (bytesToTransfer > 0) {
            // 第一次执行时,in=-1,out=0,证明buffer为空
            if (in == out)
                awaitSpace();

            // 计算可拷贝到buffer的字节总数 nextTransferAmount 
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);

            // 拷贝到buffer
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            // 已经写入1024个字节后(即buffer已满),需要将循环数组in归0
            if (in >= buffer.length) {
                in = 0;
            }
        }
}

    // 返回下一个字节
    public synchronized int read()  throws IOException
    // 将buffer的数据读取到 byte b[],当都读完后,清空buffer(即 int=-1,out=0)
    public synchronized int read(byte b[], int off, int len)  throws IOException 

二、PipedOutputStream

public class PipedOutputStream extends OutputStream{
    private PipedInputStream sink;
}
  • 构造器
    // 与PipedInputStream建立连接
    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);
    }

    // 构造未建立连接的PipedOutputStream
    public PipedOutputStream() {
    }

-核心方法

    // 建立连接
    public synchronized void connect(PipedInputStream snk) throws IOException

    public void write(int b)  throws IOException{
        sink.receive(b);
    }

    public void write(byte b[], int off, int len) throws IOException{
        sink.receive(b, off, len);
    }
    
    // 清空PipedOutputStream
    // 目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。
    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }
    
    // 关闭PipedOutputStream并释放资源
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }

三、demo

public class Test3 {
    public static void main(String[] args) throws IOException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver();
        receiver.getPipedInputStream().connect(sender.getPipedOutputStream());
        sender.start();
        receiver.start();
    }
}

class Sender extends Thread {
    private PipedOutputStream pipedOutputStream = new PipedOutputStream();

    public PipedOutputStream getPipedOutputStream() {
        return pipedOutputStream;
    }

    @Override
    public void run() {
        writeMessage();
    }

    private void writeMessage() {
        try {
            pipedOutputStream.write("哈喽,China".getBytes());
            pipedOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class Receiver extends Thread {
    private PipedInputStream pipedInputStream = new PipedInputStream();

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {
        readMessage();
    }

    void readMessage() {
        try {
            byte[] bytes = new byte[1024];
            int len = 0;
            while ((len = pipedInputStream.read(bytes)) != -1) {
                System.out.println(new String(bytes, 0, len));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

相关文章

网友评论

    本文标题:java.io源码解析(五)--管道字节流(PipedInput

    本文链接:https://www.haomeiwen.com/subject/xiyvtctx.html