美文网首页Front End
[FE] 函数响应式流库探秘

[FE] 函数响应式流库探秘

作者: 何幻 | 来源:发表于2016-08-04 12:41 被阅读80次

    xstream是专门为cycle.js定制开发的函数响应式流库(functional reactive stream library)。
    它很简洁,只提供了Stream,Listener,Producer,MemoryStream四个概念。
    我们先来学习xstream,然后再挖掘流(stream)与CPS的关系。

    1. xstream的用法

    (1)流(stream)

    流可以看做一个事件流,流上面可以绑定多个监听器,
    当流中某事件发生的时,会自动广播。

    有了流之后,我们就可以对流的整体进行操作了。
    在xstream中对流进行变换,是通过operator实现的,
    operator处理一个或多个流,返回一个新的流。

    let stream2=stream1.map(/*...*/);
    let stream3=stream2.filter(/*...*/);
    

    如上,mapfilter就是operator

    (2)监听器(listener)

    监听器用于处理当前发生的事件,时刻接受流中对所发生事件的广播。
    在xstream中,监听器是一个包含nexterrorcomplete方法的对象,
    流中每次事件发生,都会自动调用监听器的next方法,
    流中有错误发生时,会调用error方法,
    整个流停止,不再有事件发生时,调用complete方法。

    let listener={
        next:val=>{/*...*/},
        error:err=>{/*...*/},
        complete:()=>{/*...*/}
    };
    

    (3)生产者(producer)

    生产者用来生成流。
    它是一个包含startstop方法的对象,用于表示流的开始和终止。
    start函数中会使用listener,因此,listenernext方法实际上是在这里调用的。

    import xs from 'xstream';
    
    let producer={
        start(listener){
            // listener.next(/*...*/)
        },
        stop(){/*...*/}
    };
    
    let stream=xs.create(producer);
    

    (4)有记忆的流(MemoryStream)

    有记忆的流,和普通的流在operator方面和listener方面并无二致,
    唯一不同的是,有记忆的流可以将当前事件中的值传给下一个事件。
    (这里对主题帮助不大,我们暂且略过

    2. 例子

    我们学习了xstream的API,现在终于可以看到它的全貌了,

    import xs from 'xstream';
    
    let producer = {
        start: listener => {
            let i = 0;
            while (++i) {
                if (i > 10) {
                    break;
                }
    
                listener.next(i);
            }
        },
        stop: () => { }
    };
    
    let stream1 = xs.create(producer);
    let stream2 = stream1.map(x => x * 2);
    
    stream2.addListener({
        next: val => console.log(val),
        error: val => { },
        complete: () => { }
    });
    

    最后结果会输出从2到20的偶数。

    3. CPS

    我们看到实际上是在流中调用了listener,即通过listener.next(i)广播了i
    然后,流经历了一系列的变换,导致流广播的值发生了改变,
    体现到最后的listener中,接收的值就不是最开始的i了,
    而是i经历了x=> x*2之后的值i*2

    (1)对流进行抽象

    认识到问题的本质后,我们可以将流看成以下形式,

    let stream = cont => {
        let i = 0;
        while (++i) {
            if (i > 10) {
                break;
            }
    
            cont(i);
        }
    }
    

    其中,cont表示continuation
    (continuation的话题比较大,这里不影响阅读,暂略

    (2)挂载listener

    然后我们先不考虑对流进行变换,我们直接模拟挂载listener的场景,

    stream(x => console.log(x));
    

    好了,这个时候,实际上我们是将流的continuation传给了它,
    结果自然是输出从1到10的数字了。

    (3)对流进行变换

    我们怎样对流进行变换呢,
    实际上,我们需要做的就是将一个流变成另一个流,
    或者说白了,就是改变cont,然后进行传递(CPS

    这可能比较晦涩难懂,我们直接看例子吧,模拟一下x=>x*2
    (这是可以运行的

    let stream1 = cont => {
        let i = 0;
        while (++i) {
            if (i > 10) {
                break;
            }
    
            cont(i);
        }
    };
    
    let stream2 = cont => {
        let newCont = v => cont(v * 2);
        stream1(newCont);
    };
    // 简写为
    // let stream2 = cont => stream1(v => cont(v * 2));
    
    stream2(x=>console.log(x));
    

    (4)实现mapfiltermerge

    我们来尝试实现xstream中几个常用的operator,它们都返回一个新的流。

    //map是对流中的每个值进行变换
    let map = function (fn) {
        let stream = this;
        return cont => stream(x => cont(fn(x)));
    };
    let stream2 = map.call(stream1, x => x * 2);
    
    //filter是对流中的值进行过滤
    let filter = function (fn) {
        let stream = this;
        return cont => stream(x => fn(x) && cont(x));
    };
    let stream3 = filter.call(stream1, x => x % 2 != 0);
    
    //merge是合并两个流
    let merge = function (otherStream) {
        let stream = this;
        return cont => {
            stream(cont);
            otherStream(cont);
        };
    };
    let stream4 = merge.call(stream2, stream3);
    

    4. 总结与展望

    xstream采用了流的概念,实现了事件源与事件处理逻辑的分离,
    而且,对流的变换都是一些纯函数,组合起来更方便,
    因此成就了cycle.js这个优美的框架,从而MVI全新的架构模式破土而出,
    这一切,一定会在人机交互界面的解决方案上开启新的篇章啊。

    5. 参考

    xstream
    Cycle.js Document

    相关文章

      网友评论

        本文标题:[FE] 函数响应式流库探秘

        本文链接:https://www.haomeiwen.com/subject/paycsttx.html