美文网首页
用Go语言实现ReactiveX(二)——Deliver

用Go语言实现ReactiveX(二)——Deliver

作者: 一个灰 | 来源:发表于2018-09-27 17:47 被阅读0次

接上一篇用Go语言实现ReactiveX(一)——Observable

本篇,我们来实现ReactiveX中的操作符,即数据传递者Deliver。这些操作符一般包括,过滤、组合、数学运算、转换等几个大类。

Deliver既是Observable又是Observer,它接受一个或者多个Observable作为上一级的数据源,又可被订阅一次或者多次。

实现要点

  1. 传递数据、complete事件、error事件
  2. 订阅和退订上级数据源
  3. 可被下一级观察者订阅和退订

订阅上级数据源

现在假设我们有一个Observable就是前一篇文章中的FromArray

array:=int[]{1,2,3}
observable:=FromArray(array)

这个observable会依次发出1,2,3三个整数,然后complete。作为Deliver,我们可能需要去订阅这个observable,那么如何订阅呢?
根据上篇所述,订阅行为就是传入一个Next和一个Stop。

next:=make(Next)
stop:=make(Stop)
observable(next,stop)

我们完成了订阅,但我们还需要对订阅后采集来自observable的数据。

next:=make(Next)
stop:=make(Stop)
go observable(next,stop)
for d:= range next{
  //处理数据
}

由于observable里面的逻辑会阻塞当前‘线程’,所以我们加了关键字go。

退订上级数据源

close(stop)

随时都可以调用这个方法进行退订。

传递数据

真实的Deliver是这样定义的

Deliver func(source Observable) Observable

它是一个函数,接受一个Observable作为参数,返回一个Observable。
展开出来就是这样的

func deliver(source Observable) Observable {
    return func(next Next, stop Stop) {
        //deliver被订阅的时候就会执行这里面的逻辑
    }
}

我们可以在被订阅的时候,去订阅source,然后获取数据后传递给next管道

func deliver(source Observable) Observable {
    return func(next Next, stop Stop) {
        dnext:= make(Next)
        go source(dnext,stop)
        for d:= range next{
               next<-d
        }
        close(next)
    }
}

这样我就做好了一个什么也没用的数据传递者了。下面我们来实现一个有一点作用的filter

Filter的实现

func Filter(f func(interface{}) bool) Deliver {
    return func(source Observable) Observable {
        return func(next Next, stop Stop) {
            sNext := make(Next)
            go source(sNext, stop)
            for {
                select {
                case d, ok := <-sNext:
                    if !ok {
                        close(next)
                        return
                    }
                    if _, ok = d.(error); ok {
                        next <- d
                        close(next)
                        return
                    } else if f(d) {
                        next <- d
                    }
                case <-stop:
                    return
                }
            }
        }
    }
}

这里,我们用for select代替了for range,这样方便我们的接收到stop被close的时候发来的信息。我们判断了source是否complete,如果complete我们就close(next)——向下级发送complete信号。然后我们判断了数据是否是error类型,然后执行了filter函数来过滤数据。

其他的Deliver都是沿用Filter这套模板来实现的。这是个死循环结构,所以订阅deliver也需要用go关键字,这个和Observable是一脉相承的。

最后我们再看一个startwith操作符,也是一个十分常用的功能,用于在source前面加塞数据。如果有更好的表达方式,欢迎留言。

func StartWith(xs ...interface{}) Deliver {
    return func(source Observable) Observable {
        return func(next Next, s Stop) {
            stopped := false
            go func() {
                <-s
                stopped = true
            }()
            for d := range xs {
                if stopped {
                    return
                }
                next <- d
            }
            if !stopped {
                source(next, s)
            }
        }
    }
}

相关文章

网友评论

      本文标题:用Go语言实现ReactiveX(二)——Deliver

      本文链接:https://www.haomeiwen.com/subject/plmeoftx.html