美文网首页
RxJava1升级到RxJava2的注意事项

RxJava1升级到RxJava2的注意事项

作者: fomin | 来源:发表于2018-09-14 14:04 被阅读214次
    1、package更改

    rx1包名由原来的rx.xxx更改为io.reactivex.xxx,并且在同一个module之下,rx1和rx2是不兼容的。

    2、背压支持

    RxJava在1.0只有一个个观察者模式,只能部分支持背压:

    • Observable(被观察者)/Observer(观察者)
    • Observable(被观察者)/Subscriber(观察者)

    RxJava在2.0出现了两个观察者模式,新增Flowable支持背压,而Observable不支持背压:

    • Observable(被观察者)/Observer(观察者)
    • Flowable(被观察者)/Subscriber(观察者)

    注:背压是指在<mark style="box-sizing: border-box;">异步</mark>场景中,<mark style="box-sizing: border-box;">被观察者发送事件速度远快于观察者的处理速度</mark>的情况下,一种告诉上游的被观察者降低发送速度的策略,简而言之,背压是流速控制的一种策略。

    3、NULL值支持

    rx1.x是可以支持NULL值的

    Observable.just(null);
    Single.from(null);
    

    rx2.x是不支持NULL值的

    Observable.just(null)
        .subscribe(System.out::println, Throwable::printStackTrace);
    Single.just(null)
        .subscribe(System.out::println, Throwable::printStackTrace);
    
    4、ActionN 和 FuncN 改名

    ActionN 和 FuncN 遵循Java 8的命名规则。

    其中,Action0 改名成Action,Action1改名成Consumer,而Action2改成BiConsumer,而Action3 - Action9都不再使了,ActionN改成Consumer<Object[]>,新的Action都增加抛出异常处理

    同样,Func改名成Function,取消了Func0的方法,Func2改名成BiFunction,Func3 - Func9 改成 Function3 - Function9,FuncN 改为 Function<Object[], R>,新的Function都增加抛出异常处理

    5、Subscription改为Disposable

    rx1.x的Subscription

    public final Subscription subscribe(final Action1<? super T> onNext)
    

    rx2.x返回值的是Disposable,由于已经存在了 org.reactivestreams.subscription 这个类,为了避免名字冲突将原先的 rx.Subscription 改名为 io.reactivex.disposables.Disposable

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
    

    注意:Subscription 不再有订阅subcribe和unSubcribe的概念。

    6、Observer和Subscriber更改

    rx2.x中Observable不在支持Subscriber方法,改由Flowable支持Subscriber,而Observable只支持Observer方法。

    新增onSubscribe方法,Observer提供了Disposable参数,而Subscriber提供了Subscription参数,向观察者提供以同步和异步方式取消与Observable的连接的方法。

    rx1.x的Observer

    public interface Observer<T> {
        void onCompleted();
        void onError(Throwable e);
        void onNext(T t);
    }
    

    rx2.x的Observer

    public interface Observer<T> {
        void onSubscribe(Disposable d);//
        void onNext(T value);
        void onError(Throwable e);
        void onComplete();
    }
    
    public interface Subscriber<T> {
        void onSubscribe(Subscription s);//相当于onStart方法
        void onNext(T value);
        void onError(Throwable e);
        void onComplete();
    }
    
    7、Observable.OnSubscribe改为ObservableOnSubscribe

    rx1.x写法

    Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
    
        }
    });
    

    rx2.x写法

    Observable.create(new ObservableOnSubscribe<Object>() {
        @Override
        public void subscribe(ObservableEmitter<Object> e) throws Exception {
    
        }
    });
    
    8、ObservableOnSubscribe 中使用 ObservableEmitter 发送数据给 Observer

    ObservableEmitter可以理解为发射器,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。emitter的onComplete()调用后,Consumer不再接收任何next事件。

    9、Observable.Transformer 变成 ObservableTransformer

    rx1.x的写法

    public synchronized <T> Observable.Transformer<T, T> bindToLifeCycle() {
            return new Observable.Transformer<T, T>() {
                @Override
                public Observable<T> call(Observable<T> tObservable) {
                    if (lifecycleSubject == null) return upstream;
                    return upstream.takeUntil(lifecycleSubject);
                }
            };
        }
    

    rx2.x的写法

    public synchronized <T> ObservableTransformer<T, T> bindToLifeCycle() {
            return new ObservableTransformer<T, T>() {
                @Override
                public ObservableSource<T> apply(Observable<T> upstream) {
                    if (lifecycleSubject == null) return upstream;
                    return upstream.takeUntil(lifecycleSubject);
                }
            };
        }
    

    同时新增Flowable的写法

    public synchronized <T> FlowableTransformer<T, T> bindToLifeCycle() {
            return new FlowableTransformer<T, T>() {
                @Override
                public Publisher<T> apply(Flowable<T> upstream) {
                    if (lifecycleSubject == null) return upstream;
                    return upstream.takeUntil(lifecycleSubject);
                }
            };
        }
    
    10、from变更

    rx1.x中统一使用from方法名,但rx2.x不在使用from,而是已from+参数类型名命名 rx2.x新增了Publisher参数的from方法。

    rx1.0
    public static <T> Observable<T> from(T[] array)
    public static <T> Observable<T> fromCallable(Callable<? extends T> func) 
    public static <T> Observable<T> from(Iterable<? extends T> iterable)
    public static <T> Observable<T> from(Future<? extends T> future)
    rx2.0
    public static <T> Observable<T> fromArray(T... items)
    public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
    public static <T> Observable<T> fromFuture(Future<? extends T> future)
    public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
    public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
    
    11、blocking变更

    rx1.x

    .toBlocking().firstOrDefault(defaultValue)
    

    rx2.x

    .blockingFirst(defaultValue);
    

    在项目升级RxJava中暂时遇到这些变更,具体RxJava2.x的使用请去github查看https://github.com/ReactiveX/RxJava

    相关文章

      网友评论

          本文标题:RxJava1升级到RxJava2的注意事项

          本文链接:https://www.haomeiwen.com/subject/exjogftx.html