RxJava 总结

作者: 我叫陆大旭 | 来源:发表于2017-07-03 14:47 被阅读333次
    RxJava

    Hi~ 我是RxJava

    现在RxJava有1.0和2.0二个版本。这里描述的内容都是主要基于1.0版本,2.0相关内容会稍微提及。

    介绍

    一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

    项目主页

    http://reactivex.io/

    中文文档

    https://mcxiaoke.gitbooks.io/rxdocs/content/

    API文档

    http://reactivex.io/RxJava/javadoc/

    项目地址

    https://github.com/ReactiveX/RxJava
    https://github.com/ReactiveX/RxAndroid

    基本概念

    0.基本模式
    image.png
    1.Observable被观察者,事件源。

    在观察者模式中被观察对象。处于流程的上游。

    2.Observer

    观察者,接收源。在观察者模式中观察者。
    需要实现以下的方法:

    • onNext(T item)
      Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。

    • onError(Exception ex)
      当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。

    • onComplete
      正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

    3.Subscriber

    订阅者,其实也是接收者。比Observer多了一个Subscription。

     public abstract class Subscriber<T> implements Observer<T>, Subscription {
     ......
     }
    
    public interface Subscription {
        /**
         * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
         * was received.
         * <p>
         * This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
         * onCompleted is called).
         */
        void unsubscribe();
    
        /**
         * Indicates whether this {@code Subscription} is currently unsubscribed.
         *
         * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
         */
        boolean isUnsubscribed();
    }
    
    • unsubscribe();
      取消订阅
    • isUnsubscribed();
      判断是否订阅

    取消订阅的结果会传递给这个Observable的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生,然而,对一个Observable来说,即使没有观察者了,它也可以在一个while循环中继续生成并尝试发射数据项。

    4.Operator:操作符

    几种主要的需求

    直接创建一个Observable(创建操作)

    • 组合多个Observable(组合操作)
    • 对Observable发射的数据执行变换操作(变换操作)
    • 从Observable发射的数据中取特定的值(过滤操作)
    • 转发Observable的部分值(条件/布尔/过滤操作)
    • 对Observable发射的数据序列求值(算术/聚合操作)

    如果你想实现你自己的操作符,可以参考这里:实现自定义操作符

    5.Scheduler:调度器,可以切换主线程和各个线程。
    • Schedulers.immediate()

    在当前线程运行,相当于不切换线程。这是默认的 Scheduler。

    • Schedulers.newThread()

    总是启用新线程,并在新线程执行操作。

    • Schedulers.io()

    I/O 操作(读写文件、数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

    • Schedulers.computation()

    计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

    • AndroidSchedulers.mainThread()

    切换到主线程,指定的操作将在Android 主线程运行。

    8.ActionX:没有返回值的函数

    Action0.class

    public interface Action0 extends Action {
        void call();
    }
    

    Action1.class

    public interface Action1<T> extends Action {
        void call(T t);
    }
    
    

    Action2.class

    public interface Action2<T1, T2> extends Action {
        void call(T1 t1, T2 t2);
    }
    
    
    9.FuncX:有返回值的函数

    Func0.class

    public interface Func1<T, R> extends Function {
        R call(T t);
    }
    

    Func1.class

    public interface Func1<T, R> extends Function {
        R call(T t);
    }
    

    Func2.class

    public interface Func2<T1, T2, R> extends Function {
        R call(T1 t1, T2 t2);
    }
    

    与Action的区别在于是否返回参数。

    操作符

    创建操作

    用于创建Observable的操作符

    • create:通过调用观察者的方法从头创建一个Observable
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("Observable Create call");
                    subscriber.onCompleted();
                }
            });
    
    • empty:创建行为受限的特殊Observable
     Observable deferObservable = Observable.empty();
    
    • from:将其它的对象或数据结构转换为Observable
    List<String> list = new ArrayList<>();
            list.add("from1");
            list.add("from2");
            list.add("from3");
            Observable fromObservable = Observable.from(list);  //遍历list 每次发送一个
    

    变换操作

    这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档

    • buffer:缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
    Observable rangeObservable = Observable.from(list).buffer(3);
    
    • map:映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
    Observable observable = Observable.from(list).map(new Func1<String, String>() {
                @Override
                public String call(String string) {
                    return string + " map";
                }
            });
    

    过滤操作

    这些操作符用于从Observable发射的数据中进行选择

    • filter:过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
    Observable.just(1, 2, 3, 4, 5, 6)
                    .filter(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer>4;
                        }
                    }).subscribe(ObserverFactory.createIntObserver());
    
    • first:首项,只发射满足条件的第一条数据
    Observable.just(1, 2, 3, 4, 5, 6)
                    .first(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer>2;
                        }
                    }).subscribe(ObserverFactory.createIntObserver());
    

    算术和聚合操作

    这些操作符可用于整个数据序列
    Concat: 不交错的连接多个Observable的数据

    Observable<Integer> source1 =  Observable.just(1, 2, 5, 6);
            Observable<Integer> source2 =  Observable.just(7, 6, 5, 6);
            Observable.concat(source1,source2).distinct().subscribe(ObserverFactory.createIntObserver());
    

    辅助操作

    一组用于处理Observable的操作符

    • delay:延迟一段时间发射结果数据
     Observable justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"
            justObservable.delay(3000, TimeUnit.MILLISECONDS).subscribe(ObserverFactory.createObserver());
    
    • observeOn:指定观察者观察Observable的调度程序(工作线程)
     Observable justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"
            justObservable.observeOn(Schedulers.immediate()).subscribe(ObserverFactory.createObserver());
    

    更多操作符例子

    https://github.com/iamludaxu/ae/tree/master/app/src/test/java/gift/witch/android/ae/rxjava

    其他相关内容

    给 Android 开发者的 RxJava 详解 by 抛物线
    关于RxJava最友好的文章
    关于RxJava最友好的文章(进阶)
    RxJava全部操作符例子
    给初学者的RxJava2.0教程 系列

    相关文章

      网友评论

        本文标题:RxJava 总结

        本文链接:https://www.haomeiwen.com/subject/chvucxtx.html