美文网首页技术技术干货儿Android: Rx
RxJava1.X升级到RxJava2.X笔记

RxJava1.X升级到RxJava2.X笔记

作者: 续写经典 | 来源:发表于2016-11-01 11:18 被阅读10201次
    描述 RxJava 1.X RxJava 2.X
    package包名 rx.xxx io.reactivex.xxx
    Reactive Streams规范 1.X早于Reactive Streams规范出现,仅部分支持规范 完全支持
    Backpressure 背压 对背压的支持不完善 Observable设计为不支持背压
    新增Flowable支持背压
    null空值 支持 不再支持null值,传入null值会抛出 NullPointerException
    Schedulers线程调度器 Schedulers.immediate()
    Schedulers.trampoline()
    Schedulers.computation()
    Schedulers.newThread()
    Schedulers.io()
    Schedulers.from(executor)
    AndroidSchedulers.mainThread()
    移除Schedulers.immediate()
    新增Schedulers.single()
    其它未变
    Single 行为类似Observable,但只会发射一个onSuccessonError 按照Reactive Streams规范重新设计,遵循协议onSubscribe(onSuccess/onError)
    Completable 行为类似Observable,要么全部成功,要么就失败 按照Reactive Streams规范重新设计,遵循协议onSubscribe (onComplete/onError)
    Maybe 2.X新增,行为类似Observable,可能会有一个数据或一个错误,也可能什么都没有。可以将其视为一种返回可空值的方法。这种方法如果不抛出异常的话,将总是会返回一些东西,但是返回值可能为空,也可能不为空。按照Reactive Streams规范设计,遵循协议onSubscribe (onSuccess/onError/onComplete)
    Flowable 2.X新增,行为类似Observable,按照Reactive Streams规范设计,支持背压Backpressure
    Subject AsyncSubject
    BehaviorSubject
    PublishSubject
    ReplaySubject
    UnicastSubject
    2.X依然维护这些Subject现有的功能,并新增:
    AsyncProcessor
    BehaviorProcessor
    PublishProcessor
    ReplayProcessor
    UnicastProcessor
    支持背压Backpressure
    Subscriber Subscriber 由于与Reactive Streams的命名冲突,Subscriber已重命名为Disposable

    RxJava 2.X + Retrofit + OkHttp 简单示例点这里

    library依赖变化

    //1.X
    compile 'io.reactivex:rxjava:1.2.1'
    compile 'io.reactivex:rxandroid:1.2.1'
    
    //2.X
    compile 'io.reactivex.rxjava2:rxjava:2.0.0'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.0'
    

    package变化

    变动主要为rx.xxx --> io.reactivex.xxx

    //1.X
    import rx.Observable;
    import rx.Subscription;
    import rx.android.schedulers.AndroidSchedulers;
    import rx.schedulers.Schedulers;
    import rx.functions.Action1;
    
    //2.X
    import io.reactivex.Observable;
    import io.reactivex.ObservableSource;
    import io.reactivex.ObservableTransformer;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.android.schedulers.AndroidSchedulers;
    import io.reactivex.schedulers.Schedulers;
    import io.reactivex.functions.Consumer;
    

    null

    RxJava 2.X不再支持null值,如果传入一个null会抛出NullPointerException

    Observable.just(null);
    
    Single.just(null);
    
    Observable.fromCallable(() -> null)
        .subscribe(System.out::println, Throwable::printStackTrace);
    
    Observable.just(1).map(v -> null)
        .subscribe(System.out::println, Throwable::printStackTrace);
    

    案例1

    //1.X
    public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.io())
                                            .unsubscribeOn(Schedulers.io())
                                            .observeOn(Schedulers.io());
        }
    };
    public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){
        return (Observable.Transformer<T, T>)transformer;
    }
    Action1<Integer> onNext = null;
    String[] items = { "item1", "item2", "item3" };
    Subscription subscription = Observable.from(items)
                                          .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                          .map(new Func1<String, Integer>() {
                                                      @Override public Integer call(String s) {
                                                          return Integer.valueOf(s);
                                                      }
                                                  })
                                          .subscribe(onNext);
    //TODO subscription.unsubscribe();   
    
    //2.X
    public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() {
        @Override public ObservableSource apply(Observable upstream) {
            return upstream.subscribeOn(Schedulers.io())
                           .unsubscribeOn(Schedulers.io())
                           .observeOn(Schedulers.io());
        }
    };
    public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){
        return (ObservableTransformer<T, T>)transformer;
    }
    Consumer<Integer> onNext = null;
    String[] items = { "item1", "item2", "item3" };
    Disposable disposable = Observable.fromArray(items)
                                      .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                      .map(new Function<String, Integer>() {
                                                  @Override public Integer apply(String s) throws Exception {
                                                      return Integer.valueOf(s);
                                                  }
                                              })
                                      .subscribe(onNext);
    //TODO disposable.dispose();
    
    • .subscribe(...)返回值的变化:1.X为Subscription, 2.X为Disposable
    • Transformer的变化:1.X为rx.Observable内部的Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, 2.X为io.reactivexObservableTransformer<Upstream, Downstream>,是一个独立的接口
    • AndroidSchedulers的变化: 1.X为rx.android.schedulers.AndroidSchedulers, 2.X为io.reactivex.android.schedulers.AndroidSchedulers
    • Func1的变化: 1.X为rx.functions.Func1, 2.X为io.reactivex.functions.Function
    • 其它重载方法见下方截图


      1.X
      2.X

    案例2

    //1.X
    public class AppBaseActivity extends AppCompatActivity {
        ...
        private CompositeSubscription mCompositeSubscription;
        
        protected void addSubscription(Subscription subscription) {
            if (null == mCompositeSubscription) {
                mCompositeSubscription = new CompositeSubscription();
            }
            mCompositeSubscription.add(subscription);
        }
    
        @Override protected void onDestroy() {
            if (null != mCompositeSubscription) {
                mCompositeSubscription.unsubscribe();
            }
            super.onDestroy();
        }
        ...
    }
    
    //2.X
    public class AppBaseActivity extends AppCompatActivity {
        ...
        private   CompositeDisposable mCompositeDisposable;
    
        protected void addDisposable(Disposable disposable) {
            if (null == mCompositeDisposable) {
                mCompositeDisposable = new CompositeDisposable();
            }
            mCompositeDisposable.add(disposable);
        }
    
        @Override protected void onDestroy() {
            if (null != mCompositeDisposable) {
                mCompositeDisposable.clear();
            }
            super.onDestroy();
        }
        ...
    }
    

    我现在维护的项目基本就这些改动,如有错误之处,还望批评指正,谢谢!

    相关文章

      网友评论

      • mundane:大哥, 为什么我在你的demo里面没有找到rxjava、rxAndroid还有那个xLog的依赖, lib文件夹也没有。你是怎么办到的?
        续写经典:你说的应该是: RxJava 2.X + Retrofit + OkHttp 这个Demo吧,你看下这个库就明白了:https://github.com/qyxxjd/BaseProject
      • WangDDY:你太赞了
      • 光源_Android:想请教一下,你项目中有用到其他依赖 RxJava 的第三方库么,比如 GreenDAO? 多版本 RxJava 共存有没有碰到什么问题?
        续写经典:@光源_Android 我们公司的项目暂时比较简单,没有引用太多第三方库,只是单纯的将RxJava从1.X升级到2.X还是很简单的,仅仅修改了一个工具类和替换一些包名
        光源_Android:@续写经典 多谢,那你为何从 1.x 升到 2.x ? (clean框架已经升了,我索性直接用 2.x 了
        续写经典:@光源_Android 我自己的项目只会用某一个版本,不存在多版本共存问题。数据库我用的sqlbrite,如果你的项目中使用了RxJava 1.X系列,强烈推荐这个库。如果你使用的RxJava 2.X,一些第三方依赖RxJava的库大多基于1.X系列,只能自己做一层适配。或者看下RxJava核心开发者akarnokd写的一个库 https://github.com/akarnokd/RxJava2Interop ,也许会有一些帮助

      本文标题:RxJava1.X升级到RxJava2.X笔记

      本文链接:https://www.haomeiwen.com/subject/lpcmuttx.html