美文网首页
Golang 使用tee将一个channel分拆成两个相同的ch

Golang 使用tee将一个channel分拆成两个相同的ch

作者: FredricZhu | 来源:发表于2019-06-18 11:12 被阅读0次

tees/channels.go

package tees

type TeeOp struct {
}

func NewTeeOp() *TeeOp {
    teeOp := &TeeOp{}
    return teeOp
}

func (teeOp *TeeOp) OrDone(
    done, c <-chan interface{},
) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }

                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func (teeOp *TeeOp) Tee(
    done <-chan interface{},
    in <-chan interface{},
) (_, _ <-chan interface{}) {
    out1 := make(chan interface{})
    out2 := make(chan interface{})
    go func() {
        defer close(out1)
        defer close(out2)
        for val := range teeOp.OrDone(done, in) {
            var out1, out2 = out1, out2
            for i := 0; i < 2; i++ {
                select {
                case <-done:
                case out1 <- val:
                    out1 = nil
                case out2 <- val:
                    out2 = nil
                }
            }
        }
    }()
    return out1, out2
}

func (teeOp *TeeOp) Repeat(
    done <-chan interface{},
    args ...interface{},
) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            for _, v := range args {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

func (teeOp *TeeOp) Take(
    done <-chan interface{},
    valueStream <-chan interface{},
    num int,
) <-chan interface{} {
    takeStream := make(chan interface{})

    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

teedemo.go

package main

import (
    "fmt"
    "teedemo/tees"
)

func main() {
    teeOp := tees.NewTeeOp()
    done := make(chan interface{})
    defer close(done)

    out1, out2 := teeOp.Tee(done, teeOp.Take(done, teeOp.Repeat(done, 1, 2, 3), 6))
    for val1 := range out1 {
        fmt.Printf("out1: %v, out2: %v \n", val1, <-out2)
    }
}

程序输出如下


image.png

相关文章

网友评论

      本文标题:Golang 使用tee将一个channel分拆成两个相同的ch

      本文链接:https://www.haomeiwen.com/subject/ycwxqctx.html