Rxjava

作者: cxlin007 | 来源:发表于2017-09-29 14:16 被阅读6次

    Rxjava 源于ReactiveX(Reactive Extensions),Rx是一个变成模型,目标是提供一致的编程接口,帮助开发者方便的处理异步数据流,许多流行的变成语言都有Rx库。Rx扩展了观察者模式,用于支持数据和事件序列,添加了一些操作符,他人你可以声明式的组合这些序列,而无需关注底层的实现(线程、同步、线程安全、并发)。

    Rxjava优势就是使得代码更简洁,而且随着程序逻辑变得越来越复杂,它依然能够保持简洁。

    扩展的观察者

    Observable(被观察者)、Observer(观察者)、subscribe(订阅),Observable和Observer通过subscribe方法实现订阅关系。Observable在需要的时候发出事件来通知Observer。

    与传统的观察者模式不同,除了定义了普通的回调事件onNext(),还定义的两个特殊的事件onCompleted()和onError()。

    • onCompleted:事件队列完结,Rxjava把所有事件看做一个队列,并规定,当不会再有新的onNext事件触发时,需要触发onCompleted
    • onError:事件队列异常,在时间处理过程中出现异常时触发onError,同时队列终止,不允许再有事件发出
    • onCompleted和onError有且只有一个,并且是事件序列的最后一个
    基本实现

    创建Observer,定义事件触发时的具体行为

    Observer<String> observer = new Observer<String>() {
       @Override
       public void onNext(String s) {
           Log.d(tag, "Item: " + s);
       }
    
       @Override
       public void onCompleted() {
           Log.d(tag, "Completed!");
       }
    
       @Override
       public void onError(Throwable e) {
           Log.d(tag, "Error!");
       }
    };
    

    RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber,Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。
    与Observer 的主要区别:

    1. onStart:这是Subscriber 新增的方法,会在subscribe里,事件还未发送前调用,它总是在subscribe所发生的线程被调用
      2、unsubscribe:这是Subscriber 新增的方法,用于取消订阅,这个方法被调用后,Subscriber将不再接收事件。Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
    Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
               .throttleFirst(1, TimeUnit.SECONDS)
               .doOnUnsubscribe(new Action0() {
                   @Override
                   public void call() {
                       Log.e(TAG, "clicks->doOnUnsubscribe");
                   }
               })
               .subscribe(new Action1<Void>() {
                   @Override
                   public void call(Void aVoid) {
                       methd6();
                   }
               });
       //维护相关的资源引用
       subscriptions.add(clickSubscribe);
    
    @Override
    protected void onDestroy() {
       for (Subscription s : subscriptions) {
           if (!s.isUnsubscribed()) {
               s.unsubscribe();
               Log.e(TAG, "onDestroy: 取消订阅!");
           }
       }
       super.onDestroy();
    }
    

    创建Observable,定义被观察者,决定什么时候出发事件以及触发怎么样的事件

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Hi");
            subscriber.onNext("Aloha");
            subscriber.onCompleted();
        }
    });
    

    创建Observable的操作符

    • create(OnSubscribe)
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Hi");
            subscriber.onNext("Aloha");
            subscriber.onCompleted();
        }
    });
    
    • just(T...): 将传入的参数依次发送出来。
    Observable observable = Observable.just("Hello", "Hi", "Aloha");
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    
    • from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
    String[] words = {"Hello", "Hi", "Aloha"};
    Observable observable = Observable.from(words);
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    
    • empty 创建一个什么都不做直接通知完成的Observable
    • error 创建一个什么都不做直接通知错误的Observable
    • never 创建一个什么都不做的Observable
    Observable observable1=Observable.empty();//直接调用onCompleted。
    Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常
    Observable observable3=Observable.never();//啥都不做
    
    • timer 创建一个在给定的延时之后发射数据项为0的Observable<Long>
    Observable.timer(1000,TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("JG",aLong.toString()); // 0
                    }
                });
    
    • interval 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
     Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                         //每隔1秒发送数据项,从0开始计数
                         //0,1,2,3....
                    }
                });
    
    • range 创建一个发射指定范围的整数序列的Observable<Integer>
     Observable.range(2,5).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("JG",integer.toString());// 2,3,4,5,6 从2开始发射5个数据
            }
        });
    
    • defer 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。
     Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                return Observable.just("hello");
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("JG",s);
            }
        });
    

    合并操作符

    • concat: 按顺序连接多个Observables。需要注意的是Observable.concat(a,b)等价于a.concatWith(b)。
    Observable<Integer> observable1=Observable.just(1,2,3,4);
        Observable<Integer>  observable2=Observable.just(4,5,6);
    
        Observable.concat(observable1,observable2)
                .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
    
    • startWith: 在数据序列的开头增加一项数据。startWith的内部也是调用了concat
    Observable.just(1,2,3,4,5)
                .startWith(6,7,8)
        .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
    
    • merge 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知。
    • zip: 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合。内部通过OperatorZip进行压合。
    Observable<Integer>  observable1=Observable.just(1,2,3,4);
    Observable<Integer>  observable2=Observable.just(4,5,6);
    
    Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
            @Override
            public String call(Integer item1, Integer item2) {
                return item1+"and"+item2;
            }
        })
        .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
    
    • combineLatest 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合。

    过滤操作

    • filter 过滤数据。内部通过OnSubscribeFilter过滤数据。
    Observable.just(3,4,5,6)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer>4;
                    }
                })
        .subscribe(item->Log.d("JG",item.toString())); //5,6 
    

    还有ofType、take、takeLast、throttleFirst、timeout等

    条件/布尔操作

    • all 判断所有的数据项是否满足某个条件,内部通过OperatorAll实现。
     Observable.just(2,3,4,5)
                .all(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer>3;
                    }
                })
        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                Log.d("JG",aBoolean.toString()); //false
            }
        })
        ;
    

    还有exists、contains、isEmpty等

    聚合操作

    • reduce: 对序列使用reduce()函数并发射最终的结果,内部使用OnSubscribeReduce实现。
     Observable.just(2,3,4,5)
                .reduce(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer sum, Integer item) {
                        return sum+item;
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));//14
    

    还有collect、count/countLong

    转换操作

    • toList 收集原始Observable发射的所有数据到一个列表,然后返回这个列表
     Observable.just(2,3,4,5)
                .toList()
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
    
                    }
                });
    

    还有toSortedList、toMap、toMultiMap

    变换操作

    • map 对Observable发射的每一项数据都应用一个函数来变换。
    Observable.just(6,2,3,4,5)
                .map(integer -> "item:"+integer)
                .subscribe(s -> Log.d("JG",s));//item:6,item:2....
    

    还有cast、flatMap、flatMapIterable、concatMap、switchMap等

    错误处理/重试机制

    • onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable。
     Observable.just(1,"2",3)
        .cast(Integer.class)
        .onErrorResumeNext(Observable.just(1,2,3))
        .subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
        ;
    
    • onExceptionResumeNext 当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常。
    • onErrorReturn 当原始Observable在遇到错误时发射一个特定的数据。
    Observable.just(1,"2",3)
                .cast(Integer.class)
                .onErrorReturn(new Func1<Throwable, Integer>() {
                    @Override
                    public Integer call(Throwable throwable) {
                        return 4;
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("JG",integer.toString());1,4
            }
        });
    
    • retry: 当原始Observable在遇到错误时进行重试。
    Observable.just(1,"2",3)
        .cast(Integer.class)
        .retry(3)
        .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
        ;//1,1,1,1,onError
    
    • retryWhen 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,内部调用的是retry。
    Observable.just(1,"2",3)
        .cast(Integer.class)
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
            @Override
            public Observable<Long> call(Observable<? extends Throwable> observable) {
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        })
        .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
        //1,1
    

    还有一些操作符具体还是看RxJava操作符大全这个文章吧

    线程控制

    在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

    使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制

    • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
    • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

    observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。

    Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(mapOperator) // 新线程,由 observeOn() 指定
        .observeOn(Schedulers.io())
        .map(mapOperator2) // IO 线程,由 observeOn() 指定
        .observeOn(AndroidSchedulers.mainThread) 
        .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定
    

    subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

    doOnSubscribe()它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

    Observable.create(onSubscribe)
        .subscribeOn(Schedulers.io())
        .doOnSubscribe(new Action0() {
            @Override
            public void call() {
                progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);
    
    相关资料

    ReactiveX 的理念和特点
    给 Android 开发者的 RxJava 详解
    RxJava操作符大全

    相关文章

      网友评论

          本文标题:Rxjava

          本文链接:https://www.haomeiwen.com/subject/ergyextx.html