美文网首页
io.pipe 代码阅读

io.pipe 代码阅读

作者: wayyyy | 来源:发表于2022-04-18 05:04 被阅读0次
    func main() {
        reader, writer := io.Pipe()
    
        go func() {
            writer.Write([]byte("hello world"))
            defer writer.Close()
        }()
    
        buffer := make([]byte, 9)
        reader.Read(buffer)
        fmt.Println(string(buffer))
    
        buffer2 := make([]byte, 10)
        reader.Read(buffer2)
        fmt.Println(string(buffer2))
    
        time.Sleep(time.Second * 2)
    
        buffer3 := make([]byte, 10)
        _, err := reader.Read(buffer3)
        if err != nil {
            fmt.Println(err.Error())
        }
        fmt.Println(string(buffer3))
    
        reader.Close()
    
        select {}
    }
    

    输出:


    image.png
    pipe 数据结构
    type pipe struct {
        wrMu sync.Mutex // Serializes Write operations
        wrCh chan []byte
        rdCh chan int
    
        once sync.Once // Protects closing done
        done chan struct{}    // 用于事件通知,reader 关闭或者 writer 关闭。
        rerr onceError
        werr onceError
    }
    
    func Pipe() (*PipeReader, *PipeWriter) {
        p := &pipe{
            wrCh: make(chan []byte),
            rdCh: make(chan int),
            done: make(chan struct{}),
        }
        return &PipeReader{p}, &PipeWriter{p}
    }
    
    type PipeReader struct {
        p *pipe
    }
    
    type PipeWriter struct {
        p *pipe
    }
    
    pipeWriter
    func (w *PipeWriter) Write(data []byte) (n int, err error) {
        return w.p.Write(data)
    }
    
    func (p *pipe) Write(b []byte) (n int, err error) {
        select {
        case <-p.done:
            return 0, p.writeCloseError()
        default:
            p.wrMu.Lock()          // 首先需要获取锁,保证多个 writer 的串行化
            defer p.wrMu.Unlock()
        }
    
        for once := true; once || len(b) > 0; once = false {
            select {
            case p.wrCh <- b:
                nw := <-p.rdCh  // write 之后,会在这里阻塞,直到收到reader 读取后通告过来的读取长度。
                b = b[nw:]
                n += nw
            case <-p.done:
                return n, p.writeCloseError()
            }
        }
        return n, nil
    }
    
    pipeReader
    func (r *PipeReader) Read(data []byte) (n int, err error) {
        return r.p.Read(data)
    }
    
    func (p *pipe) Read(b []byte) (n int, err error) {
        select {
        case <-p.done:
            return 0, p.readCloseError()
        default:
        }
    
        select {
        case bw := <-p.wrCh:
            nr := copy(b, bw)
            p.rdCh <- nr
            return nr, nil
        case <-p.done:
            return 0, p.readCloseError()
        }
    }
    

    相关文章

      网友评论

          本文标题:io.pipe 代码阅读

          本文链接:https://www.haomeiwen.com/subject/qhsqxrtx.html