美文网首页
RxJava 2.0 简单介绍

RxJava 2.0 简单介绍

作者: coofee | 来源:发表于2017-10-11 13:26 被阅读0次

    RxJava 2.0 简单介绍

    一年一年有一年,RxJava也新增了2.0版本,那么为什么是新增版本而不说升级版本呢?

    因为2.0版本和1.0版本两者并不兼容,2.0版本是基于Reactive-Streams规范重新设计而来;同时1.x版本和2.x版本两者会并行开发维护,但是1.x版本只维护到2018-03-31

    下面我们简单介绍一下两者的不同。

    0x00 依赖&包名不同

    使用rxjava 1.x、2.x版本的依赖如下:

    // rxjava 1.x
    compile 'io.reactivex:rxjava:1.1.6'
    
    // rxjava 2.x
    compile "io.reactivex.rxjava2:rxjava:2.x.y"
    

    包名修改如下:

    // 1.x -> 2.x
    rx.** -> io.reactivex.**
    

    0x01 Observable与Flowable

    Observable在2.0版本不支持backpressure,它会缓存全部的数据,一一发送给消费者,如果消费不及时,会产生OOM。于此对应,在2.x版本新增了Flowable,支持设置/自定义backpressure,同时在创建时必须制定backpressure。

     Flowable.create(new FlowableOnSubscribe<Object>() {
                @Override
                public void subscribe(FlowableEmitter<Object> e) throws Exception {
                    for (int i = 0; i < 256; i++) {
                        e.onNext(i);
                    }
                    e.onComplete();
                }
            }, BackpressureStrategy.BUFFER).subscribe(System.out::println, Throwable::printStackTrace);
    
    
    

    0x02 Single

    当使用Single时,生产者调用onSuccess()通知订阅者,同时终止整个事件流,生产者只能发送一个success事件,订阅者也只能收到一个success事件,适用于网络请求等确定只有单个事件的事件流。对于1.x版本而言,则需要主动调用onComplete()来终止事件流。

    注意: Single没有onComplete()方法;只能产生success、error两种事件。

      Single.create(s -> s.onSuccess("aaaa"))
                    .subscribe(System.out::println, Throwable::printStackTrace);
    

    0x03 Completable

    当使用Completable时,生产者通过调用onComplete()终止事件流,订阅者会收到事件结束回调,适用于订阅者仅需要知道事件结束,而不需要执行结果的情形。

    注意: Completable没有onSuccess()方法;只能产生complete、error两种事件。

    Completable.create(new CompletableOnSubscribe() {
                @Override
                public void subscribe(CompletableEmitter e) throws Exception {
                    // do something;
                    e.onComplete();
                }
            }).subscribe();
    

    0x04 Maybe

    MaybeSingleCompletable的组合体,相较于Single只能发送一次item,Completable只能通知事件结束,Maybe可以发送最多一个item,也就是可以发送一个item或者直接终止事件流。

    Maybe调用onSuccess()结束事件流时,订阅者收到一次success事件;当Maybe调用onComplete()结束事件流时,订阅者只能收到事件结束事件。

    • onSuccess()收到一次事件:
     Maybe.create(new MaybeOnSubscribe<Object>() {
                @Override
                public void subscribe(MaybeEmitter<Object> e) throws Exception {
                    e.onSuccess("aaa");
                }
            }).subscribe(System.out::println, Throwable::printStackTrace, () -> {
                System.out.println("onCompletable...");
            });
    
    • onComplete()收到结束事件:
     Maybe.create(new MaybeOnSubscribe<Object>() {
                @Override
                public void subscribe(MaybeEmitter<Object> e) throws Exception {
                    e.onComplete();
                }
            }).subscribe(System.out::println, Throwable::printStackTrace, () -> {
                System.out.println("onCompletable...");
            });
    
    

    注意: Maybe拥有onSuccess()和onComplete()方法;可以产生success、complete、error三种事件,其中success和complete是对立的。

    0x05 Null

    2.0x版本不支持传递null事件,会抛出NullPointerException终止整个事件流。

    Single.create(new SingleOnSubscribe<Object>() {
                @Override
                public void subscribe(SingleEmitter<Object> e) throws Exception {
                    e.onSuccess(null);
                }
            }).subscribe(System.out::println, Throwable::printStackTrace);
    

    错误日志如下:

    java.lang.NullPointerException: onSuccess called with null. Null values are generally not allowed in 2.x operators and sources.
    

    0x06 取消订阅

    1. 接口改变

    2.x版本由于按照Reactive-Streams规范进行开发,而在Reactive-Streams中已经定义了org.reactivestreams.Subscription接口

    package org.reactivestreams;
    
    public interface Subscription {
        void request(long var1);
    
        void cancel();
    }
    
    

    ,而1.x版本也定义了一个rx.Subscription接口

    package rx;
    
    public interface Subscription {
        void unsubscribe();
    
        boolean isUnsubscribed();
    }
    
    

    2. 简单取消订阅

    可以看到两个类名一样,但是接口方法并不一样,含义也不相同,所以为了避免歧义,2.x版本中干掉了旧的Subscription,同时使用Disposable接口来替代旧的Subscription。具体代码如下:

    // 1.x 调用unsubscribe()方法来取消订阅
    final rx.Subscription subscription = rx.Observable.just(1, 2, 3).subscribe();
    subscription.unsubscribe();
    
    // 2.x 调用dispose()方法来取消订阅
    final Disposable subscriber = Flowable.just(1, 2, 3).subscribe();
    subscriber.dispose();
    

    3. 使用Subscriber取消订阅

    在1.x版本中,我们调用subscribe()后会返回一个rx.Subscription,我们可以使用它进行操作;在2.x版本中,我们调用subscribe()时,如果传入的是Subscriber,那就返回值是void,需要大家自己保存引用。

    
    // 1.x 
    rx.Subscription subscription = rx.Observable.just(1, 2, 3)
      .subscribe(new rx.Subscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
    
                }
    
                @Override
                public void onError(Throwable t) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    subscription.unsubscribe();
    
    
    // 2.x
    ResourceSubscriber<Integer> resourceSubscriber = new ResourceSubscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
    
                }
    
                @Override
                public void onError(Throwable t) {
                  // must dispose;
                      dispose();
                }
    
                @Override
                public void onComplete() {
                  // must dispose;
                      dispose();
                }
            };
    // 注意当传入subscriber进行订阅时,返回值是void,所以需要自己保存;
    Flowable.just(1, 2, 3).subscribe(resourceSubscriber);
    resourceSubscriber.dispose();
    
    

    4. 批量取消订阅

    1.x版本使用rx.CompositeSubscription批量取消订阅;2.x版本使用io.reactivex.disposables.CompositeDisposable批量取消订阅。

    0x07 Subject & Processor

    按照Reactive-Streams规范,Subject是一种行为,既是消费者,同时也是生成者,最终被定义为org.reactivestreams.Processor接口,故而,在1.x版本中的subject,在2.x版本中就变成了processor,并且支持backpressure。同时2.x版本中保留了1.x版本的subject,配合Observable使用,不过也不支持backpressure。如:

    // 1.x
    Subject<Object, Object> subject= new SerializedSubject<>(PublishSubject.<Object>create());
    subject.onNext("aaa");
    subject.onError("aaa");
    subject.onComplete();
    
    
    // 2.x
    final FlowableProcessor<Object> objectFlowableProcessor =
                PublishProcessor.create().toSerialized();
    objectFlowableProcessor.onNext("aa");
    objectFlowableProcessor.onError(new Throwable());
    objectFlowableProcessor.onComplete();
    

    参考

    1. RxJava 2.0

    相关文章

      网友评论

          本文标题:RxJava 2.0 简单介绍

          本文链接:https://www.haomeiwen.com/subject/oxhwyxtx.html