func main() {
reader, writer := io.Pipe()
go func() {
writer.Write([]byte("hello world"))
defer writer.Close()
}()
buffer := make([]byte, 9)
reader.Read(buffer)
fmt.Println(string(buffer))
buffer2 := make([]byte, 10)
reader.Read(buffer2)
fmt.Println(string(buffer2))
time.Sleep(time.Second * 2)
buffer3 := make([]byte, 10)
_, err := reader.Read(buffer3)
if err != nil {
fmt.Println(err.Error())
}
fmt.Println(string(buffer3))
reader.Close()
select {}
}
输出:
image.png
pipe 数据结构
type pipe struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan []byte
rdCh chan int
once sync.Once // Protects closing done
done chan struct{} // 用于事件通知,reader 关闭或者 writer 关闭。
rerr onceError
werr onceError
}
func Pipe() (*PipeReader, *PipeWriter) {
p := &pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}
return &PipeReader{p}, &PipeWriter{p}
}
type PipeReader struct {
p *pipe
}
type PipeWriter struct {
p *pipe
}
pipeWriter
func (w *PipeWriter) Write(data []byte) (n int, err error) {
return w.p.Write(data)
}
func (p *pipe) Write(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.writeCloseError()
default:
p.wrMu.Lock() // 首先需要获取锁,保证多个 writer 的串行化
defer p.wrMu.Unlock()
}
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh // write 之后,会在这里阻塞,直到收到reader 读取后通告过来的读取长度。
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}
pipeReader
func (r *PipeReader) Read(data []byte) (n int, err error) {
return r.p.Read(data)
}
func (p *pipe) Read(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.readCloseError()
default:
}
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
网友评论