美文网首页
用Go语言实现ReactiveX(一)——Observable

用Go语言实现ReactiveX(一)——Observable

作者: 一个灰 | 来源:发表于2018-09-27 10:23 被阅读0次

    用Go语言实现ReactiveX有很大的挑战,Go语言本身没有类的继承,所以无法采用基类来做一些封装操作。不过好在Go语言是有闭包和匿名函数。所以可以实现ReactiveX
    https://github.com/langhuihui/GoRx

    影响设计ReactiveX的要素

    • 没有类的继承
    • 有匿名函数
    • 有闭包
    • 强类型,没有泛型
    • goroutine代替异步

    实现生产者Observable

    1. 发送数据
    2. 完成事件
    3. error事件
    4. 被订阅
    5. 被取消订阅

    发送数据功能

    有两种方式可以实现,一种是直接调用回调函数,和javascript一样。这种方式的局限性在于代码相对啰嗦,因为golang的函数定义必须是有类型的,会涉及到更多的类型断言的操作,匿名函数使用起来也比javascript的要更麻烦一些。第二种方式是采用channel来传递数据,这种方式更加go方式一点。所以我后来采取了第二种方式实现。(第一种也尝试过)
    简而言之,核心就是一个chan interface{},一个无缓冲的channel用来发送数据。这个channel是由Observer传递进来的(类似于回调的概念)

    type Next chan interface{}
    
    Observable <------Next----- Observer     //subscribe
    Observable
          Next-----data----> Observer       //next
    

    被订阅

    当Observable接收到用于发送数据的channel的时候,就是被订阅的时候。见上图。

    完成事件

    利用close一个channel会产生一个事件的方式进行触发。

    Observable  close(Next)  ------> Observer              (complete)
    

    Observer通过对channel读取操作,如果第二个参数返回false(channel已经被关闭)代表complete

    data,ok:=<-next
    if !ok{
    //complete
    }
    

    error事件

    由于golang对异常捕获目前上不健全,所以暂时就通过next channel发送错误对象,在Observer中对数据类型进行类型断言,如果是error类型,则认为收到了错误事件。

    被取消订阅(dispose)

    这个事件是由Observer向Observable发出的
    我们定义了一个新的channel :chan bool。成为stop channel专门用来做这个事情,这个channel不发送任何数据,只用来close的时候广播这个事件。

    type Stop chan bool
    

    channel在close的时候,所有等待接受数据的goroutine均能接受到这个关闭事件,这是其他语言不具备的优势。

    Obserable <-------Next、Stop---------- Observer  //subscribe
                      <--------- close stop ----------- Observer  //dispose
    

    案例:FromArray

    func FromArray(array []interface{}) Observable {
        return func(n Next, s Stop) {
            for _, item := range array {
                select {
                case <-s:
                    return
                default:
                    n <- item
                }
            }
            close(n)
        }
    }
    

    我们看到FromArray是一个函数,调用FromArray(数组或切片),会返回一个Observable。Observable是一个函数

    type Obserable func(Next, Stop)
    

    我们遍历传入的数组或切片,然后向Next管道传入数组中的元素,假如Stop被关闭,我们也能即使取消数据发送。
    当所有数据发送完毕我们关闭Next管道,发出complete信号。
    (未完待续)

    相关文章

      网友评论

          本文标题:用Go语言实现ReactiveX(一)——Observable

          本文链接:https://www.haomeiwen.com/subject/tncdoftx.html