RxJava2

作者: simplehych | 来源:发表于2019-01-30 15:47 被阅读0次

    本篇文章使用的版本

    implementation "io.reactivex.rxjava2:rxjava:2.2.5"
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    

    五种观察者模式

    背压机制:在异步场景下,被观察者发送事件的速度 远快于 观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
    在进行异步操作时会通过缓存来存储发射的数据。
    在 RxJava1.x 时,这些缓存是无界的,当需要缓存的数据非常多的时候,会占用非常多的存储空间,并有可能因为虚拟机不断 GC 而导致程序执行过慢,甚至 OOM。
    在 RxJava2.x 中,大多数的异步操作内存都存在一个有界的缓存,当超出这个缓存的时候会抛出 MissingBackpressureException 异常并结束整个队列。

    注意:2.0版本 .subscribe(Observer) 方法没有返回值 void,如有需要可参考使用 subscribeWith();但是 不完全参数public final Disposable subscribe(Consumer<? super T> onNext) 有返回值。

    1. Observable.subscribe(Observer)
      不支持背压
    // package io.reactivex;
    public abstract class Observable<T> implements ObservableSource<T>
    
    // package io.reactivex;
    public interface ObservableSource<T> {
        /**
         * 注意:2.0版本没有返回值 void
         * 如有需要可参考使用 subscribeWith()
         */
        void subscribe(@NonNull Observer<? super T> observer);
    }
    
    // package io.reactivex;
    public interface Observer<T> {
        void onSubscribe(@NonNull Disposable d);
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }
    
    // package io.reactivex.disposables;
    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }
    
    1. Flowable.subscribe(Subscriber)
      支持背压
    // package io.reactivex;
    public abstract class Flowable<T> implements Publisher<T> 
    
    // package org.reactivestreams;
    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    
    // package org.reactivestreams;
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    
    // package org.reactivestreams;
    public interface Subscription {
        public void request(long n);
        public void cancel();
    }
    
    1. Single. subscribe(SingleObserver)
      只有一个 onSuccess 或者 onError
    // package io.reactivex;
    public abstract class Single<T> implements SingleSource<T>
    
    // package io.reactivex;
    public interface SingleSource<T> {
        void subscribe(@NonNull SingleObserver<? super T> observer);
    }
    
    // package io.reactivex;
    public interface SingleObserver<T> {
        void onSubscribe(@NonNull Disposable d);
        void onSuccess(@NonNull T t);
        void onError(@NonNull Throwable e);
    }
    
    // package io.reactivex.disposables;
    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }
    
    1. Completable.subscribe(CompletableObserver)
      只有一个 onComplete 或者 onError
    // package io.reactivex;
    public abstract class Completable implements CompletableSource 
    
    // package io.reactivex;
    public interface CompletableSource {
        void subscribe(@NonNull CompletableObserver co);
    }
    
    // package io.reactivex;
    public interface CompletableObserver {
        void onSubscribe(@NonNull Disposable d);
        void onComplete();
        void onError(@NonNull Throwable e);
    }
    
    // package io.reactivex.disposables;
    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }
    
    1. Maybe. subscribe(MaybeObserver)
      只有一个 onSuccess 或者 onError 或者 onComplete
    // package io.reactivex;
    public abstract class Maybe<T> implements MaybeSource<T> 
    
    // package io.reactivex;
    public interface MaybeSource<T> {
        void subscribe(@NonNull MaybeObserver<? super T> observer);
    }
    
    // package io.reactivex;
    public interface MaybeObserver<T> {
        void onSubscribe(@NonNull Disposable d);
        void onSuccess(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }
    
    // package io.reactivex.disposables;
    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }
    

    Observable 操作符使用

    看图即可,之后具体分析可忽略,直接看下一节


    RxJava2操作符

    创建操作符

    create()

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
    

    just()
    不超过10个

    public static <T> Observable<T> just(T item)
    ...
    public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
    

    from 系列

    public static <T> Observable<T> fromArray(T... items)
    public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
    public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
    public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
    public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
    

    defer()
    只有在观察者订阅的时候才会创建被观察者

    public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
    
    Observable.defer(new Callable<ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> call() throws Exception {
            return Observable.just(1);
        }
    });
    

    timer()
    当到指定时间后发送一个 0L 的值给观察者

    public static Observable<Long> timer(long delay, TimeUnit unit)
    

    interval
    每隔一段时间发送一个事件,不断增1的数字

    public static Observable<Long> interval(long period, TimeUnit unit)
    

    intervalRange

    public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    

    range

    public static Observable<Integer> range(final int start, final int count)
    

    rangeLong

    public static Observable<Long> rangeLong(long start, long count)
    

    empty & never & error

    // 不发送任何事件
    public static <T> Observable<T> never()
    // 发送 onComplete() 事件
    public static <T> Observable<T> empty()
    // 发送 onError() 事件
    public static <T> Observable<T> error(final Throwable exception)
    

    repeat() & repeatWhen()

    转换操作符

    map() & cast()
    一对一


    map cast

    flatMap()& concatMap()
    一对多,flatMap不保证事件的顺序,concatMap与上游发送的顺序一致

    buffer()
    将整个事件流进行分组

    Observable.range(1,7)
            .buffer(3)
            .subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    Log.i(TAG,integers.toString());
                    Log.i(TAG,"----");
                }
            });
    
    输出
    01-29 15:03:09.570 I/TestRxJava2Operate: [1, 2, 3]
    01-29 15:03:09.570 I/TestRxJava2Operate: ----
    01-29 15:03:09.570 I/TestRxJava2Operate: [4, 5, 6]
    01-29 15:03:09.570 I/TestRxJava2Operate: ----
    01-29 15:03:09.570 I/TestRxJava2Operate: [7]
    01-29 15:03:09.570 I/TestRxJava2Operate: ----
    
    buffer

    groupBy()

    groupBy

    scan()
    累加器 accumulator,依次输出


    scan

    reduce()
    和 scan() 一样累加,只是只发送最后一个值

    window()
    和 buffer 类似,但不是发射来自 Observable 的数据包,发射的是 Observable,最后发射一个 onCompleted 通知
    [图片上传失败...(image-d3834c-1548834337169)]

    过滤操作符

    filter()
    规则过滤

    distinct() & distinctUntilChanged()
    去重过滤

    skip() & skipLast() & skipWhile() & skipUntil()
    过滤掉前几项

    take() & takeLast() & takeUntil() & takeWhile()
    只保留前几项

    elementAt() & firstElement() & lastElement()
    获取队列中指定位置的事件

    ignoreElements()
    过滤掉所有队列中的事件,只保留 onComplete / onError

    throttleFirst() & throttleLast & throttleLatest & throttleWithTimeout
    对时间进行切片,选取第一个,最后一个,最近的一个
    throttleLast 底层使用的 sample 方法实现

    throttleWithTimeout 底层使用的 debounce 方法实现;仅在过了一段指定的时间还没有发射数据时才发射一个数据,如果在一个事件片达到之前,发射的数据之后又紧跟着发射了一个数据,那么这个时间片 之前发射的数据会被丢弃

    debounce()
    用来限制发射频率过快的,它仅在过了一段指定的时间还没发射数据时才发射一个数据,否则丢弃之前的数据


    debounce

    sample()
    throttleLast 内部实现调用的是 sample


    sample

    ofType()
    过滤掉不符合该类型的事件

    组合操作符

    startWith() & startWithArray()
    在事件队列之前插入数据

    merge() & mergeArray() & mergeDelayError & mergeArrayDelayError()
    多个事件队列合并起来发射,可能交错无序

    merge() 和 mergeError() 只有在处理错误 onError 时不同。mergeError() 在错误之前所有事件发射完毕之后才把错误发射出来,多个错误只发射一个错误;
    merge() 在遇到错误时,直接抛出来结束操作

    concat() & concatArray() & concatEager()& concatDelayError & concatArrayDelayError()
    多个事件队列合并起来发射,严格按顺序发射

    concatEager方法,当一个观察者订阅了它的结果,那么就相当于订阅了它拼接的所有ObservableSource,并且会先缓存这些ObservableSource发射的数据,然后再按照顺序将它们发射出来;

    zip() & zipArray() & zipIterable()
    将多个数据项合并,可以指定合并规则

    combineLatest() & combineLatestDelayError()
    组合最近的俩个数据


    image

    collect()
    将数据收集到数据结构中

    Observable.just(1, 2, 3, 4)
    .collect(new Callable < ArrayList < Integer >> () {
        @Override
        public ArrayList < Integer > call() throws Exception {
            return new ArrayList < > ();
        }
    },
    new BiConsumer < ArrayList < Integer > , Integer > () {
        @Override
        public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
            integers.add(integer);
        }
    })
    .subscribe(new Consumer < ArrayList < Integer >> () {
        @Override
        public void accept(ArrayList < Integer > integers) throws Exception {
            Log.d(TAG, "===============accept " + integers);
        }
    });
    

    辅助操作符

    delay()
    在发送事件前延迟
    doXXX 系列

    doOnSubscribe,在 subscribe() 订阅之前触发
    doOnLifecycle,
    doOnEach,在每个 onNext() 调用之前触发
    doOnNext,在 onNext() 之前触发
    doAfterNext,在 onNext() 方法之后触发
    doOnTerminate,在 Observable 终止onComplete() / onError()之前触发
    doOnComplete,在 onComplete() 之前触发
    doOnError,在 onError() 之前触发
    doFinally,在 onComplete() 或 onError() 结束之后触发
    doAfterTerminate,在 Observable 终止之后触发
    doOnDispose,在 dispose 之前触发
    

    输出示例:

    01-28 16:08:30.460 I/TestRxJava2: doOnSubscribe 
    01-28 16:08:30.460 I/TestRxJava2: doOnLifecycle Consumer
    01-28 16:08:30.460 I/TestRxJava2: Observer onSubscribe 
    01-28 16:08:30.460 I/TestRxJava2: ObservableEmitter onNext 0
    01-28 16:08:30.460 I/TestRxJava2: doOnEach 
    01-28 16:08:30.460 I/TestRxJava2: doOnNext 
    01-28 16:08:30.460 I/TestRxJava2: Observer onNext 0
    01-28 16:08:30.460 I/TestRxJava2: doAfterNext 
    01-28 16:08:30.460 I/TestRxJava2: ObservableEmitter onComplete
    01-28 16:08:30.460 I/TestRxJava2: doOnTerminate 
    01-28 16:08:30.460 I/TestRxJava2: doOnComplete 
    01-28 16:08:30.460 I/TestRxJava2: doOnEach 
    01-28 16:08:30.460 I/TestRxJava2: Observer onComplete
    01-28 16:08:30.460 I/TestRxJava2: doFinally 
    01-28 16:08:30.460 I/TestRxJava2: doAfterTerminate 
    

    subscribeOn() & observeOn()
    线程切换,subscribeOn()指定观察者运行的线程;observeOn()指定被观察者发射事件所运行的线程

    timeout()
    设置超时时间,指定时间内没有任何数据,就执行我们的数据项,后续事件不再发送

    错误处理操作符

    onErrorReturn() & onErrorResumeNext() & onExceptionResumeNext()
    onErrorReturn(): 在触发 onError 的时候用一个值代替,并调用 onCompleted() 结束本次队列,而不会将错误传递给观察者
    onErrorResumeNext(): 用 Observable 代替
    onExceptionResumeNext(): 只处理 Exception() ,如果是 Error 则不处理。(二者都继承 Throwable)

    retry() & retryUtil() & retryWhen()
    重复试错

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new Throwable("Error1"));
    //                        emitter.onError(new Throwable("Error2"));
                    emitter.onNext(3);
                }
            })
            .retry(2, new Predicate<Throwable>() {
                @Override
                public boolean test(Throwable throwable) throws Exception {
                    return true;
                }
            })
            .subscribe();
    
    // 输出
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onSubscribe false
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 1
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onNext: 2
    01-29 22:36:43.723 I/TestRxJava2Operate: testRetry onError: java.lang.Throwable: Error1
    

    条件操作符

    all() & any()
    all(): 是否全部满足
    any(): 是否存在一个

    contains() & isEmpty()
    contains(): 是否包含指定项
    isEmpty(): 是否为空

    sequenceEqual()
    判断俩个序列是否相等

    amb()
    作用两个或多个 Observable,但是只会发射最先发射数据的那个 Observable 的全部数据

    Observable
            .amb(Arrays.asList(Observable.range(1, 5),
                    Observable.range(6, 5)))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.i(TAG, "testAmb " + integer);
                }
            }); 
    

    defaultIfEmpty()
    当指定的序列为空的时候指定一个用于发射的值,需要调用 onCompleted()

    转换操作符

    toList() & toSortedList()
    toMap() & toMultimap()
    toFlowable()
    to()

    Flowable 背压机制

    使用示例

    示例1

    Flowable
        .range(0, 100)
        .onBackpressureBuffer()
        .observeOn(Schedulers.io())
        .subscribe(new Subscriber<Integer>() {
            
            Subscription sub;
    
            @Override
            public void onSubscribe(Subscription s) {
                Log.w(TAG, "onsubscribe start");
                sub = s;
                s.request(2);
                Log.w(TAG, "onsubscribe end");
            }
    
            @Override
            public void onNext(Integer integer) {
                Log.w(TAG, "onNext--->" + integer);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                sub.request(3);
            }
    
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
    
            @Override
            public void onComplete() {
                Log.w(TAG, "onComplete");
            }
        });
    

    输出结果如下

    01-28 10:48:09.040 W/TestRxJava2: onsubscribe start
    01-28 10:48:09.040 W/TestRxJava2: onsubscribe end
    01-28 10:48:09.050 W/TestRxJava2: onNext--->0
    01-28 10:48:11.050 W/TestRxJava2: onNext--->1
    01-28 10:48:13.050 W/TestRxJava2: onNext--->2
    01-28 10:48:15.050 W/TestRxJava2: onNext--->3
    01-28 10:48:17.050 W/TestRxJava2: onNext--->4
    

    示例2

    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                emitter.onNext(i);
            }
            emitter.onComplete();
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.i(TAG, "Consumer accept integer: " + integer);
                }
            });
    

    解决背压问题

    导致的原因:使用 .observeOn() 方法监听了 非背压PublishProcessortimer()interval()或者自定义的create()
    .observeOn() 方法的默认缓存大小是128,当生产的速度过快时,会很快超出该缓存大小,从而导致内存溢出。

    1 增加缓存大小

    使用.observeOn() 重载方法来设置缓存的大小

    PublishProcessor<Integer> source = PublishProcessor.create();
    source
            .observeOn(Schedulers.computation(), false, 1024 * 1024)
    //                .observeOn(Schedulers.computation())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    compute(integer);
                }
            });
    
    for (int i = 1; i < 1_000_000; i++) {
        source.onNext(i);
    }
    

    只解决暂时背压问题,当生产速率过快的时候还是可能造成缓存溢出

    2 通过丢弃或者过滤来减轻缓存压力

    使用 throttleXXX 或者 sample()等方式减少接收的数据

    PublishProcessor<Integer> source = PublishProcessor.create();
    source
            .sample(1, TimeUnit.SECONDS)
            .observeOn(Schedulers.computation())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    compute(integer);
                }
            });
    
    for (int i = 1; i < 1_000_000; i++) {
        source.onNext(i);
    }
    

    该方式仅用来减少下游接收的数据,不改变上游发送数据和缓存数据,解决背压问题,但还是会导致缓存溢出

    3 onBackpressureBuffer

    onBackpressureBuffer()

    无参的方式使用一个无界的缓存,只要虚拟机没有抛出 OOM 异常,它就会把所有的数据缓存下来,而只会将一小部分的数据传递给 observeOn

    Flowable
        .range(1, Integer.MAX_VALUE)
        .onBackpressureBuffer()
        .observeOn(Schedulers.computation(), false, 8)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                compute(integer);
            }
        });
    

    这种处理方式实际上不存在背压,因为 onBackpressureBuffer 缓存了所有的数据

    onBackpressureBuffer 系列

    onBackpressureBuffer(boolean delayError)
    onBackpressureBuffer(int capacity)
    onBackpressureBuffer(int capacity, boolean delayError)
    onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded)
    onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded,Action onOverflow)
    onBackpressureBuffer(int capacity, Action onOverflow)
    onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy)
    
    capacity:指定有界缓存
    delayError:是否延迟抛出异常
    unbounded:无界
    onOverflow:缓存溢出回调
    overflowStrategy:缓存策略
    
    public enum BackpressureOverflowStrategy {
        /** 抛出异常 BufferOverflowException */
        ERROR,
        /** 丢去最老的值*/
        DROP_OLDEST,
        /** 丢弃最新的值 */
        DROP_LATEST
    }
    

    4 onBackpressureDrop

    不会缓存任何数据,专注当下,新来的数据来不及处理就丢掉,以后会有更好的


    bp.obp.drop.png
    5 onBackpressureLatest

    会缓存一个数据,当正在执行某个人物的时候有新的数据过来,会把它缓存起来,如果又有新的数据过来,那就把之前的替换掉,缓存里面的总是最新的


    bp.obp.latest.png

    操作符

    public interface Action {
        void run() throws Exception;
    }
    
    public interface Consumer<T> {
        void accept(T t) throws Exception;
    }
    
    public interface BiConsumer<T1, T2> {
        void accept(T1 t1, T2 t2) throws Exception;
    }
    
    public interface LongConsumer {
        void accept(long t) throws Exception;
    }
    
    public interface Function<T, R> {
        R apply(@NonNull T t) throws Exception;
    }
    
    public interface BiFunction<T1, T2, R> {
        R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
    }
    
    public interface Function3<T1, T2, T3, R> {
        R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3) throws Exception;
    }
    
     ... Function4 ... Function8 ...
    
    public interface Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> {
        R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3, @NonNull T4 t4, @NonNull T5 t5, @NonNull T6 t6, @NonNull T7 t7, @NonNull T8 t8, @NonNull T9 t9) throws Exception;
    }
    
    public interface IntFunction<T> {
        T apply(int i) throws Exception;
    }
    
    public interface Predicate<T> {
        boolean test(@NonNull T t) throws Exception;
    }
    
    public interface BiPredicate<T1, T2> {
        boolean test(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
    }
    
    public interface Cancellable {
        void cancel() throws Exception;
    }
    
    public interface BooleanSupplier {
        boolean getAsBoolean() throws Exception; // NOPMD
    }
    

    非背压 Subject

    Subject 可以同时代表 Observer 和 Observable,允许从数据源中多次发送结果给多个观察者。除了 onSubscribe(), onNext(), onError() 和 onComplete() 之外,所有的方法都是线程安全的。此外,你还可以使用 toSerialized() 方法,也就是转换成串行的,将这些方法设置成线程安全的,如 PublishSubject.create().toSerialized()

    public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    
        public abstract boolean hasObservers();
    
        public abstract boolean hasThrowable();
    
        public abstract boolean hasComplete();
    
        public abstract Throwable getThrowable();
    
        public final Subject<T> toSerialized() {
            if (this instanceof SerializedSubject) {
                return this;
            }
            return new SerializedSubject<T>(this);
        }
    }
    

    AsyncSubjectBehaviorSubject、SingleSubjectPublishSubjectReplaySubjectUnicastSubjectCompletableSubjectMaybeSubject 等均继承 Subject

    AsyncSubject

    非粘性,只有先注册后发送事件才能接收
    只有调用 onComplete 才能触发

        public void testAsyncSubject() {
            AsyncSubject<String> subject = AsyncSubject.create();
            subject.onNext("one");
            subject.onNext("two");
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.i(TAG, "testAsyncSubject: " + s);
                }
            });
            subject.onNext("three");
            subject.onComplete();
        }
    

    输出

    01-27 18:18:03.190  I/TestRxJava2: testAsyncSubject: three
    

    BehaviorSubject

    能收到订阅之前的最后一个事件 和订阅之后发送的事件

            BehaviorSubject<String> subject = BehaviorSubject.create();
            subject.onNext("zero");
            subject.onNext("one");
            subject.onNext("two");
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.i(TAG, "testBehaviorSubject first: " + s);
                }
            });
            subject.onNext("three");
            subject.onNext("four");
    

    输出

    01-27 18:18:27.990  I/TestRxJava2: testBehaviorSubject: two
    01-27 18:18:27.990  I/TestRxJava2: testBehaviorSubject: three
     01-27 18:18:27.990 I/TestRxJava2: testBehaviorSubject: four
    

    PublishSubject

    非粘性,只有先注册后发送事件才能接收

            PublishSubject<Integer> subject = PublishSubject.create();
            subject.onNext(1);
            subject.onNext(2);
            subject.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.i(TAG, "testPublishSubject first: " + integer);
                }
            });
            subject.onNext(3);
            subject.onNext(4);
            subject.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.i(TAG, "testPublishSubject second: " + integer);
                }
            });
            subject.onNext(5);
            subject.onNext(6);
    

    输出

    01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 3
    01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 4
    01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 5
    01-27 18:14:33.600  I/TestRxJava2: testPublishSubject second: 5
    01-27 18:14:33.600  I/TestRxJava2: testPublishSubject first: 6
    01-27 18:14:33.600  I/TestRxJava2: testPublishSubject second: 6
    

    ReplaySubject

    粘性事件

            ReplaySubject<String> subject = ReplaySubject.create();
            subject.onNext("zero");
            subject.onNext("one");
            subject.onNext("two");
            subject.onNext("four");
            subject.onNext("five");
    
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.i(TAG, "testRelaySubject first: " + s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.i(TAG, "testRelaySubject first error");
    
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    Log.i(TAG, "testRelaySubject first onComplete");
                }
            });
            subject.onNext("six");
            subject.onNext("seven");
            subject.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.i(TAG, "testRelaySubject second: " + s);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.i(TAG, "testRelaySubject second error");
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    Log.i(TAG, "testRelaySubject second onComplete");
                }
            });
            subject.onNext("eight");
            subject.onNext("nine");
            subject.onComplete();
    

    输出

    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: zero
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: one
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: two
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: four
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: five
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: six
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: seven
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: zero
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: one
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: two
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: four
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: five
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: six
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: seven
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: eight
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: eight
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first: nine
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second: nine
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject first onComplete
    01-27 18:21:14.840 I/TestRxJava2: testRelaySubject second onComplete
    

    UnicastSubject

    粘性事件,只能有一个观察者,java.lang.IllegalStateException: Only a single observer allowed.

    UnicastSubject<Integer> subject = UnicastSubject.create();
            subject.onNext(0);
            subject.onNext(1);
            subject.onNext(2);
            subject.onNext(3);
            subject.onNext(4);
            subject.onNext(5);
            subject.onNext(6);
            subject.onNext(7);
            subject.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.i(TAG, "testUnicastSubject first: " + integer);
                }
            });
            subject.onNext(8);
            subject.onNext(9);
    

    输出

    01-27 18:22:10.440 I/TestRxJava2: testUnicastSubject first: 0
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 1
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 2
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 3
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 4
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 5
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 6
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 7
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 8
    01-27 18:22:10.450 I/TestRxJava2: testUnicastSubject first: 9
    

    背压 Processor

    public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> {
        
        public abstract boolean hasSubscribers();
    
        public abstract boolean hasThrowable();
    
        public abstract boolean hasComplete();
    
        public abstract Throwable getThrowable();
    
        public final FlowableProcessor<T> toSerialized() {
            if (this instanceof SerializedProcessor) {
                return this;
            }
            return new SerializedProcessor<T>(this);
        }
    }
    

    以下均继承FlowableProcessor

    AsyncProcessor
    BehaviorProcessor --
    MulticastProcessor
    PublishProcessor --
    ReplayProcessor
    SerializedProcessor --
    UnicastProcessor

    Scheduler 线程调度

    computation()
    io()
    trampoline()
    newThread()
    single()
    from(@NonNull Executor executor)
    

    统一取消订阅

    CompositeDisposable 统一订阅
    subscribeWith 返回观察者
    ResourceSubscriber 等实现 Disposable 接口

    CompositeDisposable compositeDisposable = new CompositeDisposable();
    
    ResourceSubscriber<Integer> resourceSubscriber 
            = Flowable
            .range(1, 8)
            .subscribeWith(new ResourceSubscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
                }
    
                @Override
                public void onError(Throwable t) {
                }
    
                @Override
                public void onComplete() {
                }
            });
    
    compositeDisposable.add(resourceSubscriber);
    

    类似 ResourceSubscriber 的还有

    Subscribers 系列

    public abstract class DisposableSubscriber<T> implements FlowableSubscriber<T>, Disposable 
    
    public abstract class ResourceSubscriber<T> implements FlowableSubscriber<T>, Disposable 
    
    // 没有实现 Disposable,而是实现了Subscription
    public final class SafeSubscriber<T> implements FlowableSubscriber<T>, Subscription 
    
    // 没有实现 Disposable,而是实现了Subscription
    public final class SerializedSubscriber<T> implements FlowableSubscriber<T>, Subscription 
    
    public class TestSubscriber<T>
    extends BaseTestConsumer<T, TestSubscriber<T>>
    implements FlowableSubscriber<T>, Subscription, Disposable 
    

    Observers 系列

    public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable 
    
    public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable 
    
    public abstract class DisposableObserver<T> implements Observer<T>, Disposable 
    
    public abstract class DisposableSingleObserver<T> implements SingleObserver<T>, Disposable 
    
    public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable 
    
    public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable 
    
    public abstract class ResourceObserver<T> implements Observer<T>, Disposable 
    
    public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable 
    
    public final class SafeObserver<T> implements Observer<T>, Disposable 
    
    public final class SerializedObserver<T> implements Observer<T>, Disposable 
    
    public class TestObserver<T>
    extends BaseTestConsumer<T, TestObserver<T>>
    implements Observer<T>, Disposable, MaybeObserver<T>, SingleObserver<T>, CompletableObserver 
    

    参考资料

    感谢以下文章作者
    RxJava2 系列-1:一篇的比较全面的 RxJava2 方法总结
    关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭
    RxJava2 只看这一篇文章就够了
    RxJava 组合操作符

    相关文章

      网友评论

          本文标题:RxJava2

          本文链接:https://www.haomeiwen.com/subject/fhynjqtx.html