美文网首页
用Go语言实现ReactiveX(二)——Deliver

用Go语言实现ReactiveX(二)——Deliver

作者: 一个灰 | 来源:发表于2018-09-27 17:47 被阅读0次

    接上一篇用Go语言实现ReactiveX(一)——Observable

    本篇,我们来实现ReactiveX中的操作符,即数据传递者Deliver。这些操作符一般包括,过滤、组合、数学运算、转换等几个大类。

    Deliver既是Observable又是Observer,它接受一个或者多个Observable作为上一级的数据源,又可被订阅一次或者多次。

    实现要点

    1. 传递数据、complete事件、error事件
    2. 订阅和退订上级数据源
    3. 可被下一级观察者订阅和退订

    订阅上级数据源

    现在假设我们有一个Observable就是前一篇文章中的FromArray

    array:=int[]{1,2,3}
    observable:=FromArray(array)
    

    这个observable会依次发出1,2,3三个整数,然后complete。作为Deliver,我们可能需要去订阅这个observable,那么如何订阅呢?
    根据上篇所述,订阅行为就是传入一个Next和一个Stop。

    next:=make(Next)
    stop:=make(Stop)
    observable(next,stop)
    

    我们完成了订阅,但我们还需要对订阅后采集来自observable的数据。

    next:=make(Next)
    stop:=make(Stop)
    go observable(next,stop)
    for d:= range next{
      //处理数据
    }
    

    由于observable里面的逻辑会阻塞当前‘线程’,所以我们加了关键字go。

    退订上级数据源

    close(stop)
    

    随时都可以调用这个方法进行退订。

    传递数据

    真实的Deliver是这样定义的

    Deliver func(source Observable) Observable
    

    它是一个函数,接受一个Observable作为参数,返回一个Observable。
    展开出来就是这样的

    func deliver(source Observable) Observable {
        return func(next Next, stop Stop) {
            //deliver被订阅的时候就会执行这里面的逻辑
        }
    }
    

    我们可以在被订阅的时候,去订阅source,然后获取数据后传递给next管道

    func deliver(source Observable) Observable {
        return func(next Next, stop Stop) {
            dnext:= make(Next)
            go source(dnext,stop)
            for d:= range next{
                   next<-d
            }
            close(next)
        }
    }
    

    这样我就做好了一个什么也没用的数据传递者了。下面我们来实现一个有一点作用的filter

    Filter的实现

    func Filter(f func(interface{}) bool) Deliver {
        return func(source Observable) Observable {
            return func(next Next, stop Stop) {
                sNext := make(Next)
                go source(sNext, stop)
                for {
                    select {
                    case d, ok := <-sNext:
                        if !ok {
                            close(next)
                            return
                        }
                        if _, ok = d.(error); ok {
                            next <- d
                            close(next)
                            return
                        } else if f(d) {
                            next <- d
                        }
                    case <-stop:
                        return
                    }
                }
            }
        }
    }
    

    这里,我们用for select代替了for range,这样方便我们的接收到stop被close的时候发来的信息。我们判断了source是否complete,如果complete我们就close(next)——向下级发送complete信号。然后我们判断了数据是否是error类型,然后执行了filter函数来过滤数据。

    其他的Deliver都是沿用Filter这套模板来实现的。这是个死循环结构,所以订阅deliver也需要用go关键字,这个和Observable是一脉相承的。

    最后我们再看一个startwith操作符,也是一个十分常用的功能,用于在source前面加塞数据。如果有更好的表达方式,欢迎留言。

    func StartWith(xs ...interface{}) Deliver {
        return func(source Observable) Observable {
            return func(next Next, s Stop) {
                stopped := false
                go func() {
                    <-s
                    stopped = true
                }()
                for d := range xs {
                    if stopped {
                        return
                    }
                    next <- d
                }
                if !stopped {
                    source(next, s)
                }
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:用Go语言实现ReactiveX(二)——Deliver

          本文链接:https://www.haomeiwen.com/subject/plmeoftx.html