RxJava2基础分析

作者: Kael_祈求者 | 来源:发表于2019-05-10 09:30 被阅读2次

    (1)RxJava2 的5大基类及本章学习结构图


    580912676904c00390abfc4776cce2fab6e.jpg

    (1)Flowable --> 0..N flows, supporting Reactive-Streams and backpressure
    (2)Observable --> 0..N flows, no backpressure,
    (3)Single --> a flow of exactly 1 item or an error,
    (4)Completable --> a flow without items but only a completion or error signal.
    (5)Maybe --> a flow with no items, exactly one item or an error.

    Type Class Interface Consumer
    0..N backpressured Flowable Publisher1 Subscriber
    0..N unbounded Observable ObservableSource2 Observer
    1 element or error Single SingleSource SingleObserver
    0..1 element or error Maybe MaybeSource MaybeObserver
    0 element or error Completable CompletableSource CompletableObserver

    Observable(被观察者)/Observer(观察者)
    Flowable(被观察者)/Subscriber(观察者)

    (1)怎么解决背压问题?

    Flowable.create(new FlowableOnSubscribe<Integer>(), BackpressureStrategy.BUFFER)
    BUFFER : 缓存池队列,下游进行消费数据,OOM
    MISSING:不丢也不缓存,需要下游进行处理  Sample,ThrottleFirst, buffer(1,Seconds)...
    ERROR:下游处理异常,MISSING  
    DROP:丢弃下游不能处理的数据
    LATEST:保证写入最后一个数据,之前的可能会丢掉
    

    (2)解决背压问题后带来的问题

    解决背压问题是需要增加额外的处理逻辑,因此,Flowable的运行效率要比Observable 差一些
    在不存在背压问题的场景下,尽量使用Observable
    

    RxJava2.X中,Observeable用于订阅Observer,是不支持背压的,而Flowable用于订阅Subscriber,是支持背压(Backpressure)的。

      //不支持背压,会导致内存暴增,最后导致oom
        public void ClickOne(View view) {
            //被观察者在主线程中,每1ms发送一个事件
            Observable.interval(1, TimeUnit.MICROSECONDS)
                    //将观察者的工作放在新线程环境中
                    .observeOn(Schedulers.newThread())
                    //观察者处理每1000ms才处理一个事件
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            //这是新加入的方法,在订阅后发送数据之前,
                            //回首先调用这个方法,而Disposable可用于取消订阅
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            Log.w("TAG", "---->" + aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
        }
    
            //支持背压操作
            Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10000; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            }, BackpressureStrategy.DROP)  //指定背压策略
                    .subscribeOn(Schedulers.computation())
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d("TAG", integer.toString());
                            Thread.sleep(1000);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.d("TAG", throwable.toString());
                        }
                    });
    
    

    其中还需要注意的一点在于,Flowable并不是订阅就开始发送数据,而是需等到执行Subscription#request才能开始发送数据。当然,使用简化subscribe订阅方法会默认指定Long.MAX_VALUE。

            Flowable.range(1, 10)
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            Log.w("TAG", "onSubscribe start");
                            s.request(Integer.MAX_VALUE);
                            Log.w("TAG", "onSubscribe end");
                        }
    
                        @Override
                        public void onNext(Integer aLong) {
                            Log.w("TAG", "---->" + aLong);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.w("TAG", "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.w("TAG", "onComplete");
                        }
                    });
        }
    

    Single、Completable、Maybe是简化的Observable,只是具有少部分功能.
    Single:只能发射一条单一数据或者一条异常通知,不能发射完成通知,数据与通知只能发射一个,二选一。

        //Single操作 单一操作
        //只发射一条单一的数据,或者一条异常通知,不能发射完成通知,其中数据与通知只能发射一个。
        public void ClickThree(View view) {
            Single.create(new SingleOnSubscribe<String>() {
                @Override
                public void subscribe(SingleEmitter<String> emitter) throws Exception {
                    emitter.onSuccess("成功");
                }
            }).subscribe(new SingleObserver<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "Disposable");
                }
    
                @Override
                public void onSuccess(String s) {
                    Log.e("TAG", s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("TAG", e.toString());
                }
            });
        }
    

    Completable:只能发射一条完成通知或者一条异常通知,不能发射数据,要么发射完成通知要么发射异常通,二选一。

        //Completable操作
        //只发射一条完成通知,或者一条异常通知,不能发射数据,其中完成通知与异常通知只能发射一个
        public void ClickFour(View view) {
            Completable.create(new CompletableOnSubscribe() {
                @Override
                public void subscribe(CompletableEmitter emitter) throws Exception {
                    emitter.onComplete();
                }
            }).subscribe(new CompletableObserver() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "onSubscribe");
                }
    
                @Override
                public void onComplete() {
                    Log.e("TAG", "onComplete");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("TAG", "onError" + e.getMessage());
                }
            });
        }
    

    Maybe:只能发射一条单一数据,和发射一条完成通知,或者一条异常通知,完成通知和异常通知二选一,只能在发射完成通知或异常通知之前发射数据,否则发射数据无效

       //Maybe操作
        //Maybe发射单一数据和完成通知
        public void ClickFive(View view) {
            Maybe.create(new MaybeOnSubscribe<String>() {
                @Override
                public void subscribe(MaybeEmitter<String> emitter) throws Exception {
                    emitter.onError(new Exception("异常测试"));
                    //emitter.onSuccess("111");
                    //emitter.onComplete();
                }
            }).subscribe(new MaybeObserver<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "onSubscribe");
                }
    
                @Override
                public void onSuccess(String s) {
                    Log.e("TAG", "onSuccess" + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("TAG", "onError" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.e("TAG", "onComplete");
                }
            });
        }
    

    二.线程调度
    (1)什么是线程调度?
    控制数据流和操作在不同线程之间进行切换
    (2)怎么进行切换

    SubscribeOn() 
        指定数据源所在的线程 
        .subscribeOn(Schedulers.newThread()) 指定数据源在新的线程里发射数据
    
    ObserveOn()   
        指定观察者在哪个线程接收数据
        .observeOn(Schedulers.newThread()) 指定观察者在新的线程里接收数据
    

    (3)内置线程(Schedulers)
    Schedulers.computation( )
    多用于计算任务:工作线程数等于CPU数目
    Schedulers.from(executor)
    用户自定义线程池:根据需要进行自定义线程池执行任务
    Schedulers.immediate()
    当前线程中运行:无须指定
    Schedulers.io( )
    适用于IO操作:工作线程会被缓存一段时间,线程数可随需要增长
    Schedulers.newThread( )
    每次操作都会创建一个新的线程
    Schedulers.single( )
    只有一个工作线程,任务队列按照先进先出的顺序依次执行
    Schedulers.trampoline( )
    在当前线程执行,如果当前线程正在执行其他任务,则暂停,等待空闲执行

    简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
    多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
    但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

    三.操作符(Operators)

    Creating Observables  创建
    Transforming Observables 变换
    Filtering Observables  过滤
    Combining Observables 组合
    Error Handling Operators 错误处理
    Observable Utility Operators 工具
    Conditional and Boolean Operators 条件
    Mathematical and Aggregate Operators  聚合
    Backpressure Operators 背压
    Connectable Observable Operators 可连接
    Operators to Convert Observables  转换
    

    操作符总结:
    创建,变换,过滤,组合,统计,错误处理,背压处理 ,连接
    source.operator1().operator2().operator3().subscribe(consumer);
    (1)操作符太多我这边讲述几个常用的,如果有需要用到更多的操作符,大家都可以去对应官网看解释。http://reactivex.io/documentation/operators.html
    1.创建 (Creating Observables)

    Create — create an Observable from scratch by calling observer methods programmatically
    Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
    Empty/Never/Throw — create Observables that have very precise and limited behavior
    From — convert some other object or data structure into an Observable
    Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
    Just — convert an object or a set of objects into an Observable that emits that or those objects
    Range — create an Observable that emits a range of sequential integers
    Repeat — create an Observable that emits a particular item or sequence of items repeatedly
    Start — create an Observable that emits the return value of a function
    Timer — create an Observable that emits a single item after a given delay
    

    示例

    Observable.range(1, 10).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer t) throws Exception {
            System.out.println(t);
        }
    });
    Flowable.range(1, 10).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer t) throws Exception {
            System.out.println(t);
        }
    });
    

    2.变换 (Transforming Observables)

    Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
    FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
    GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
    Map — transform the items emitted by an Observable by applying a function to each item
    Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
    Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
    

    示列

    File folder = new File(FOLDER_PATH_P);
    List<String> namesList = new ArrayList<>();
    Observable.fromArray(folder.listFiles()).flatMap(new Function<File, ObservableSource<File>>() {
        @Override
        public ObservableSource<File> apply(File t) throws Exception {
            return Observable.fromArray(t.listFiles());
        }
    }).map(new Function<File, String>{
        @Override
        public String applay(File t) throws Exception {
            return t.getName();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("accept:" + t);
        }
    });
    

    3.过滤 (Filtering Observables)

    Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
    Distinct — suppress duplicate items emitted by an Observable
    ElementAt — emit only item n emitted by an Observable
    Filter — emit only those items from an Observable that pass a predicate test
    First — emit only the first item, or the first item that meets a condition, from an Observable
    IgnoreElements — do not emit any items from an Observable but mirror its termination notification
    Last — emit only the last item emitted by an Observable
    Sample — emit the most recent item emitted by an Observable within periodic time intervals
    Skip — suppress the first n items emitted by an Observable
    SkipLast — suppress the last n items emitted by an Observable
    Take — emit only the first n items emitted by an Observable
    TakeLast — emit only the last n items emitted by an Observable
    

    示列:

    .filter(new Predicate<String>() {
        @Override
        public boolean test(String t) throws Exception {
            return t.endsWith("png");
        }
    })
    Observable.just(1, 2, 3, 4).take(2).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer t) throws Exception {
            System.out.println(t);
        }
    });
    

    4.组合 (Combining Observables)

    And/Then/When — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries
    CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
    Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
    Merge — combine multiple Observables into one by merging their emissions
    StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
    Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
    Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
    

    示列:

    Observable<File> o1 = Observable.fromArray(folder1.listFiles());
    Observable<File> o2 = Observable.fromArray(folder2.listFiles());
    o1.mergeWith(o2).map(new Function<File, String>() {
        @Override
        public String apply(File t) throws Exception {
            return t.getName();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println(t);
        }
    })
    

    5.错误处理 (Error Handling Operators)

    Catch — recover from an onError notification by continuing the sequence without error
    Retry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error
    

    示列:

    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("accept:" + t);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable t) throws Exception {
            System.out.println("accept: error");
        }
    });
    .retry(n) 出现异常的情况下,重试 n 次,  实际执行n+1次
    

    6.工具 (Observable Utility Operators)

    Delay — shift the emissions from an Observable forward in time by a particular amount
    Do — register an action to take upon a variety of Observable lifecycle events
    Materialize/Dematerialize — represent both the items emitted and the notifications sent as emitted items, or reverse this process
    ObserveOn — specify the scheduler on which an observer will observe this Observable
    Serialize — force an Observable to make serialized calls and to be well-behaved
    Subscribe — operate upon the emissions and notifications from an Observable
    SubscribeOn — specify the scheduler an Observable should use when it is subscribed to
    TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
    Timeout — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
    Timestamp — attach a timestamp to each item emitted by an Observable
    Using — create a disposable resource that has the same lifespan as the Observable
    

    示列:

    int total= 6;
    Observable<Long> observable = Observable.intervalRange(0, total, 0, 1, TimeUnit.SECONDS);
    从0开始增加,每一秒一次,执行6次
    

    7.条件(Conditional and Boolean Operators)

    All — determine whether all items emitted by an Observable meet some criteria
    Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
    Contains — determine whether an Observable emits a particular item or not
    DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
    SequenceEqual — determine whether two Observables emit the same sequence of items
    SkipUntil — discard items emitted by an Observable until a second Observable emits an item
    SkipWhile — discard items emitted by an Observable until a specified condition becomes false
    TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
    TakeWhile — discard items emitted by an Observable after a specified condition becomes false
    

    示列:

    Observable<Integer> observable = Observable.just(1, 2, 3, 4);
    observable.contains(2).subscribe(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean t1, Throwable t2) throws Exception {
            System.out.println(t1);
        }
    });
    = true
    
    SkipUntil:
    Observable<Long> observable1 = Observable.interval(1, TimeUnit.SECONDS).take(4);
    Observable<Integer> observable2 = Observable.just(1).delay(3, TimeUnit.SECONDS);
    observable1.skipUntil(observable2).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long t) throws Exception {
            System.out.println(t);
        }
    });
    =  2  3
    

    8.聚合(Mathematical and Aggregate Operators)

    Average — calculates the average of numbers emitted by an Observable and emits this average
    Concat — emit the emissions from two or more Observables without interleaving them
    Count — count the number of items emitted by the source Observable and emit only this value
    Max — determine, and emit, the maximum-valued item emitted by an Observable
    Min — determine, and emit, the minimum-valued item emitted by an Observable
    Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
    Sum — calculate the sum of numbers emitted by an Observable and emit this sum
    特殊计算需要使用:'io.reactivex:rxjava-math:1.0.0'
    

    示列:

    Observable.just(folder).flatMap(new Function<File, Observable<File>>() {
        @Override
        public Observable<File> apply(File t) throws Exception {
            return Observable.fromArray(t.listFiles());
        }
    }).count().subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long t) throws Exception {
            System.out.println("count:" + t);
        }
    });
    =文件数
    

    9.可连接 (Connectable Observable Operators)

    Connect — instruct a connectable Observable to begin emitting items to its subscribers
    Publish — convert an ordinary Observable into a connectable Observable
    RefCount — make a Connectable Observable behave like an ordinary Observable
    Replay — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items
    

    示列:

    ConnectableObservable<Long> c1 = Observable.interval(1, TimeUnit.SECONDS).publish();
    c1.subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long t) throws Exception {
            System.out.println(t);
        }
    });
    c1.connect();
    

    11.转换(Operators to Convert Observables)

    To — convert an Observable into another object or data structure
    

    示列:

    Observable.just(1, 2).toList().subscribe(new Consumer<List<Integer>>() {
        @Override
        public void accept(List<Integer> t) throws Exception {
            System.out.println(t);
        }
    });
    

    相关文章

      网友评论

        本文标题:RxJava2基础分析

        本文链接:https://www.haomeiwen.com/subject/kkgsoqtx.html