美文网首页AndroidAndroid知识Android技术知识
RxJava(2.0)-你可能需要知道这些

RxJava(2.0)-你可能需要知道这些

作者: 24K男 | 来源:发表于2017-09-22 13:35 被阅读0次

    作为一个Android开发从业者,当你处理异步任务时,如果还在使用着Handler+Thread,那么你可能需要了解下RxJava这个优秀的开源框架;当然如果你正在跳槽面试,RxJava也是经常被问到的框架。
    关于介绍RxJava的文章也非常多,但是很多文章基于的版本还是1.0.X,而本博文就基于2.0版本对RxJava进行一个简单的介绍和分析,也算是抛砖引玉吧。
    本博文基于RxJava 2.0.0版本进行分析讲解。
    参考:抛物线大神《给 Android 开发者的 RxJava 详解》

    RxJava是什么?

    简单的归纳为两个字:异步

    归纳毕竟是归纳,不能完全表明RxJava的概念,那么我们来看GitHub上给出的解释:
    a library for composing asynchronous and event-based programs by using observable sequences.

    我用我蹩脚的CET-6水平给大家翻译下,大概就是这个意思:
    一个使用可观测序列来组成异步的、基于事件的程序的库。

    这对于刚接触的童鞋们可能不太容易理解,RxJava的核心还是异步,其他的定语都是基于其之上,有了这个思维和认识,再去学习RxJava也能更容易接受和理解其设计。


    为什么要使用RxJava?

    我写溜溜的[AsyncTask / Handler / Thread/ ... ],干嘛要使用这个奇怪的RxJava啊?

    还能为什么?简洁呗

    异步操作的很重要的一点就是保持程序和代码的简洁性,Android内部提供的AsyncTask以及Handlder+Thread都是为了解决异步代码编写繁琐问题,从而使编写异步代码更加简洁。在保持代码和程序简洁这个目的上,RxJava倒是更加的努力和方便,它的优点是随着程序逻辑变得越来越复杂,它仍然可以保持简洁、优雅

    口说无凭,我们来分析下面这样一个例子。

    图片展示可能是我们每个Android开发者都要面对的问题,假设在我们的Activity上存在一个ListView,并且我们提供了一个addImage方法来任意添加待显示的图片。现在需要将某个目录下所有的png图片都加载并显示在ListView中,由于读取和解析图片是一个耗时过程,因此我们需要将这个过程放在后台执行;而图片的显示则必须放在主线程(UI线程)中。

    那么在没有使用RxJava时,我们怎么编写这段代码呢?

        new Thread() {
                @Override
                public void run() {
                    super.run();
                    for (File folder : folders) {
                        File[] files = folder.listFiles();
                        for (File file : files) {
                            if (file.getName().endsWith(".png")) {
                                final Bitmap bitmap = getBitmapFromFile(file);
                                ((MainActivity) context).runOnUiThread(new Runnable() {
                                    @Override
                                    public void run() {
                                        imageList.add(bitmap);
                                        imageListAdatper.notifyDataSetChanged();
                                    }
    
                                });
    
                            }
                        }
                    }
                }
            }.start();
    
    

    没有对比,就没有伤害,如果我们使用RxJava的话,是如何实现的呢?

    
    Observable.fromArray(folders)
                    .flatMap(new Function<File, ObservableSource<File>>() {
                        @Override
                        public ObservableSource<File> apply(File file) throws Exception {
                            return Observable.fromArray(file.listFiles());
                        }
                    })
                    .filter(new Predicate<File>() {
                        @Override
                        public boolean test(File file) throws Exception {
                            return file.getName().endsWith(".png");
                        }
                    })
                    .map(new Function<File, Bitmap>() {
    
                        @Override
                        public Bitmap apply(File file) throws Exception {
                            return getBitmapFromFile(file);
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Bitmap>() {
                        @Override
                        public void accept(Bitmap bitmap) throws Exception {
                            imageList.add(bitmap);
                            imageListAdatper.notifyDataSetChanged();
                        }
                    });
    
    

    这代码变简洁了吗?这代码量也没减少啊,而且这一大堆代码都是什么意思啊?完全看不懂啊。

    各位看官,你先消消气,我们讲的简洁是:逻辑上的简洁,并不是单纯的代码减少(说实话,我们其实更关注这个)。

    仔细看下这段代码,之前的if..else呢?之前的那么多循环呢?好像都不见了,完全是从上到下的一条链式调用,而且没有嵌套(你是不是也讨厌好多层的嵌套,反正我是),现在看起来是不是逻辑更加清楚了呢。

    此时RxJava的优势还不能完全体现出来,而且看到这么多陌生的函数,你也一定有点不知其解,那么我们就带着疑惑接着往下看。

    API

    虽然我知道你有很强的理解和学习能力,但是我还是决定要对RxJava的一些常用的API进行介绍和说明,以便你能更顺畅的阅读全文。

    1.观察者模式

    RxJava的异步实现,是通过一种扩展的观察者模式来实现的。

    我们来看下什么是观察者模式?

    观察者模式(有时又被称为发布(publish )-订阅(Subscribe)模式、模型-视图(View)模式、源-收听者(Listener)模式或从属者模式)是软件设计模式的一种。在此种模式中,一个目标物件管理所有相依于它的观察者物件,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实现事件处理系统。

    这是百度给出的解释,我们在日常编码中使用的点击事件的处理就采用了观察者模式。

    clkBtn.setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View v) {
                    Toast.makeText(MainActivity.this, "The button was clicked", Toast.LENGTH_LONG).show();
    
                }
            });
    
    

    在典型的Click事件处理中,Button就是被观察者,而我们设置的OnClickListener就是观察者,在我们点击Button时,OnClickListener的onClick方法就会被回调。

    2. RxJava的观察者模式

    2.1 几个对象

    我们先来了解下RxJava给我们提供的几个常用的对象。

    • FLowable与Observable

    在2.0版本中被观察者新的实现叫做Flowable, 同时旧的Observable也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException。

    举个简单的例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException, 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。

    而在 2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。并且规范要求,所有的操作符强制支持背压。

    幸运的是,Flowable 中的操作符大多与旧有的 Observable 类似。

    • Observer与Subscriber

    Observer就是我们前面提到的观察者,与Observable组合使用。

    Subscriber也被成为订阅者,一般与Flowable组合使用。

    因为Observable不再支持背压,因此如果我们使用RxJava2.0版本,Flowable可能是你的不二人选。

    基于以上的分析,本文以下的示例将采用Flowable进行说明和讲解。

    2.2 回调

    为什么称RxJava采用了扩展的观察者模式呢?我们知道传统的观察者回调接口中只有一个update方法,那么RxJava呢?它可不止一个,让我们来看下Subscriber的定义。

    public interface Subscriber<T> {
        /**
         * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
         * <p>
         * No data will start flowing until {@link Subscription#request(long)} is invoked.
         * <p>
         * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
         * <p>
         * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
         * 
         * @param s
         *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
         */
        public void onSubscribe(Subscription s);
    
        /**
         * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
         * 
         * @param t the element signaled
         */
        public void onNext(T t);
    
        /**
         * Failed terminal state.
         * <p>
         * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
         *
         * @param t the throwable signaled
         */
        public void onError(Throwable t);
    
        /**
         * Successful terminal state.
         * <p>
         * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
         */
        public void onComplete();
    }
    
    

    RxJava的观察者接口中提供了onSubscribe、onNext、onError、onComplete四个回调方法,而传统的观察者模式中只有update一个回调方法,这也是称之为扩展的观察者模式的一部分原因。
    下面我们来分析下Subscriber接口中几个方法:

    1. onSubscribe
      这个方法是2.0之后才有的方法,主要是给观察者提供了一个终止事件接收的机会(当然我们也可以做一些预处理),它也会首先被调用。
      要终止接收事件,可以调用Subscription的cancel方法。

    2. onNext
      我们可以将其理解为传统观察者模式回调接口中的update方法,它可能会被调用多次。它的调用顺序在onSubscribe之后。

    3. onError
      在事件处理过程中出异常时,onError会被触发,同时事件队列自动终止,不会再有事件发出。

    4. onComplete
      在事件队列传递完毕后,该方法会被调用。
      在一个正确运行的事件序列中, onComplete()和onError()有且只有一个,并且是事件序列中的最后一个。
      需要注意的是,onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    在一个正确的事件序列中,onError与onComplete互斥且唯一。

    相比于传统的观察者模式,RxJava使用的扩展观察者模式好像变得复杂了,但是从另一方面来讲它也更加的丰富了,把更多的主动权和机会交给了使用者。

    3. 实战

    看了那么多的概念,是不是觉得有点枯燥和乏味呢,那我们就开始动手使用RxJava来体验一下吧。

    3.1 引用

    怎么在我们的项目中使用RxJava和RxAndroid呢?

    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    // Because RxAndroid releases are few and far between, it is recommended you also
    // explicitly depend on RxJava's latest version for bug fixes and new features.
    compile 'io.reactivex.rxjava2:rxjava:2.1.3'
    

    3.2 实例

    1. 1.0 方式
            //定义被观察者
            Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("Hello");
                    e.onNext("World");
                    e.onNext("!");
                    //注意在此调用onComplete方法结束事件的处理
                    e.onComplete();
                }
            });
    
    
            // 定义观察者
            Observer<String> observer = new Observer<String>() {
    
                // 该方法会在onNext方法之前调用
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe->11111");
    
                    // d.dispose();
                }
    
                @Override
                public void onNext(String value) {
                    System.out.println(value);
    
                }
    
                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
    
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete->222222");
                }
            };
    
            // 订阅
            observable.subscribe(observer);
            
    
    
    1. 2.0方式
            //创建Flowable对象
            Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
                @Override
                public void subscribe(@NonNull FlowableEmitter e) throws Exception {
                    e.onNext("Hello");
                    e.onNext("World");
                    e.onNext("!");
                    //注意在此调用onComplete方法结束事件的处理
                    e.onComplete();
                }
            }, BackpressureStrategy.BUFFER);
    
            // 定义观察者
            Subscriber subsrciber= new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println("onSubscribe->11111");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }
    
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete->222222");
                }
            };
    
            // 订阅
            flowable.subscribe(subsrciber);
    
    
    
    1. 订阅

    订阅这句代码看起来好奇怪,主要是subscribe()这个方法有点怪:它看起来是『observalbe 订阅了 observer / subscriber』而不是『observer / subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

    1. 运行结果

    分别运行上面的两段代码,运行效果相同,如下所示:

    onSubscribe->11111
    Hello
    World
    !
    onComplete->222222
    

    这可能是最简单的RxJava使用示例了。

    3.3 创建被观察者

    在上面的示例中,我们采用了Observable.create方法来创建被观察者,并且在subscribe方法中完成了事件的传递。
    RxJava 还提供了一些方法用来快捷创建事件队列,我们一起来看一下。

    1. just(T...)

    将传递的参数,依次发送出去。

    Flowable.just("Hello", "World", "!")
    // 将会依次调用:
    // onNext("Hello");
    // onNext("World");
    // onNext("!");
    // onComplete();
    
    

    这句代码的效果与上面示例中的效果相同。

    1. from(T[]) / from(Iterable<? extends T>)
      将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
    String[] values = new String[]{"Hello", "Wrold", "!"};
    
    Flowable observable = Flowable.fromArray(values);
    // 将会依次调用:
    // onNext("Hello");
    // onNext("World");
    // onNext("!");
    // onComplete();
    
    

    上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create() 的例子是等价的。

    3.4 灵活的事件回调定义

    RxJava支持定义不完整的事件回调定义,就是我们可以抛弃Subscriber的定义,而只选择定义其中的一部分回调。
    看下代码可能会更明了。

            String[] values = new String[]{"Hello", "Wrold", "!"};
            Consumer onNext = new Consumer<String>() {
    
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("onNext:" + s);
                }
            };
    
            Consumer<? super Throwable> onError = new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    throwable.printStackTrace();
                }
            };
    
            Action onComplete = new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("onComplete");
                }
            };
    
            // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
            Flowable.fromArray(values)
                    .subscribe(onNext);
    
            // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
            Flowable.fromArray(values)
                    .subscribe(onNext, onError);
    
            // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
            Flowable.fromArray(values)
                    .subscribe(onNext, onError, onComplete);
    
    

    是不是很灵活?嗯,是的。

    3.5 Schedulers

    在 RxJava的默认规则中,事件的发出和消费都是在同一个线程的,在哪个线程调用subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
    也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。
    观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于RxJava 是至关重要的。
    而要实现异步,则需要用到 RxJava 的另一个概念: Schedulers(调度器) 。

    1. API

    在RxJava中,Scheduler相当于线程控制器,RxJava通过它来指定每一段代码应该运行在什么样的线程。

    RxJava已经内置了一些调度器,主要有以下几个:

    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程,这是默认的Scheduler。
    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • AndroidSchedulers.mainThread():它指定的操作将在 Android 主线程运行,属于Android专用的调度器。

    有了这几个Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

    • subscribeOn(): 指定 subscribe() 所发生的线程,即Flowable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

    • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

       Flowble.just(1, 2, 3)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("The receive num is :" + integer);
                    }
                }
      

    上面这段代码中,由于subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3 将会在 IO线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber数字的打印将发生在主线程。

    事实上,这种在 subscribe() 之前写上两句subscribeOn(Scheduler.io()) 和observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

    4.变换

    RxJava提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

    在开发中我们经常碰到这样的场景:从本地读取并加载图片。也就是说我们通常的入参是一个文件路径,而我们想要得到的是一个BitMap对象,那么如果使用RxJava我们该如何优雅的实现呢?

        final String filePath = "/images/logo.png";
    
            Flowble.just(filePath)
                    .map(new Function<String, Bitmap>() {
    
                        @Override
                        public Bitmap apply(@NonNull String s) throws Exception {
                            return getBitmapFromFile(new File(filePath));
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Bitmap>() {
                        @Override
                        public void accept(Bitmap bitmap) throws Exception {
                            showBitmap(bitmap);
                        }
                    });
    
    

    就问你优雅不优雅?牛逼不牛逼?

    可以看到,map()方法将参数中的String对象转换成一个Bitmap对象后返回,而在经过map()方法后,事件的参数类型也由 String转为了Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。

    那么常用的事件变换有那些呢?

    1. map

    事件对象的直接变换,具体功能上面已经介绍过,它是RxJava 最常用的变换。
    在上面的例子中我们可以看到,map方法将参数中的 String对象变换为一个 Bitmap对象后返回,而在经过 map方法后,事件的参数类型也由String变为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。

    2. flatMap

    flatMap和map有共同点,都是将一个对象转换为另一个对象,不同的是map只是一对一的转换,而flatMap可以是一对多的转换,并且是转换为另外一个Flowable对象!
    示例如下:

            ArrayList<String[]> list = new ArrayList<>();
            String[] words1 = {"Hello,", "I am", "China!"};
            String[] words2 = {"Hello,", "I am", "Beijing!"};
            list.add(words1);
            list.add(words2);
            Flowable.fromIterable(list)
                    .flatMap(new Function<String[], Publisher<String>>() {
                        @Override
                        public Publisher<String> apply(@NonNull String[] strings) throws Exception {
                            return Flowable.fromArray(strings);
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println("Consumer->accept:"+s);
    
                        }
                    });
    
    

    运行结果如下所示:

    Consumer->accept:Hello,
    Consumer->accept:I am
    Consumer->accept:China!
    Consumer->accept:Hello,
    Consumer->accept:I am
    Consumer->accept:Beijing!
    

    flatMap的转换可以分解为三个过程:

    1. 根据传入的事件生成一个Publisher对象(其实也可以理解为Flowable)。
    2. 激活该Flowable对象发送事件,而不是直接发送该Flowable对象。
    3. 同一个Flowable对象发送的事件都会汇总到Flowable后,Flowable负责将事件统一传递给subsrciber。

    3. lift

    我们可以将该方法视为map与flatMap的底层调用实现,其目的就是定义我们自己的Operator来完成变换。
    lift方法接收一个FlowableOperator的参数,这个FlowableOperator就是定义我们自己的转换操作。
    这样解释起来可能有些不太明了,下面我们举两个简单的例子来看下怎么使用lift实现map和flatMap的效果。

    • map的lift写法
        Flowable.just(filePath)
                    .lift(new FlowableOperator<Bitmap, String>() {
                        @Override
                        public Subscriber<? super String> apply(@NonNull final Subscriber<? super Bitmap> observer) throws Exception {
                            return new Subscriber<String>() {
                                @Override
                                public void onSubscribe(Subscription s) {
                                    observer.onSubscribe(s);
                                }
    
                                @Override
                                public void onNext(String s) {
                                    observer.onNext(getBitmapFromFile(new File(s)));
    
                                }
    
                                @Override
                                public void onError(Throwable t) {
                                    observer.onError(t);
                                }
    
                                @Override
                                public void onComplete() {
                                    observer.onComplete();
    
                                }
                            };
                        }
                    })
                    .subscribe(new Consumer<Bitmap>() {
                        @Override
                        public void accept(Bitmap bitmap) throws Exception {
                            showBitmap(bitmap);
                        }
                    });
    
    • flatMap的lift写法
    Flowable.fromIterable(list)
                    .lift(new FlowableOperator<String, String[]>() {
    
                        @Override
                        public Subscriber<? super String[]> apply(@NonNull final Subscriber<? super String> observer) throws Exception {
                            return new Subscriber<String[]>() {
                                @Override
                                public void onSubscribe(Subscription s) {
                                    observer.onSubscribe(s);
                                }
    
                                @Override
                                public void onNext(String[] strings) {
                                    Flowable.fromArray(strings)
                                            .subscribe(new Consumer<String>() {
                                                @Override
                                                public void accept(String s) throws Exception {
                                                    observer.onNext(s);
                                                }
                                            });
                                }
    
                                @Override
                                public void onError(Throwable t) {
                                    observer.onError(t);
    
                                }
    
                                @Override
                                public void onComplete() {
                                    observer.onComplete();
                                }
                            };
                        }
                    })
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println("accept ->"+s);
                        }
                    });
    
    

    4. range

    该方法比较简单,用于产生int和long型数字。

    
        Flowable.range(1,5)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    })
    

    输出1 2 3 4 5五个数字。

    5. merge

    主要用户合并对象,示例如下:

    ArrayList<String> list1 = new ArrayList<>();
            list1.add("1");
            list1.add("2");
            list1.add("3");
            ArrayList<String> list2 = new ArrayList<>();
            list2.add("4");
            list2.add("5");
            list2.add("6");
    
            Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            System.out.println(s);
                        }
                    });
    
    

    输出1 2 3 4 5 6。

    6. compose

    调解转换的作用,示例如下:

        Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
    
                    .compose(new FlowableTransformer<String, Integer>() {
    
                        @Override
                        public Publisher<Integer> apply(@NonNull Flowable<String> upstream) {
    
                            return upstream.map(new Function<String, Integer>() {
                                @Override
                                public Integer apply(@NonNull String s) throws Exception {
                                    return Integer.parseInt(s);
                                }
                            });
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer s) throws Exception {
                            System.out.println(s);
                        }
                    });
    
    
    

    输出1 2 3 4 5 6 六个数字。

    7. compose与lift的区别

    两者都实现了变换的功能,但是变换的内容和对象却不相同。

    • lift实现的是对事件和事件序列的变换。
    • compose实现的是Flowable本身的变换。

    5 总结

    至此,我们对RxJava的使用分析告一段落,作为一个牛逼的异步框架,如果能正确的引入到我们的项目中来一定能提高我们效率,降低后期我们的维护成本。
    祝各位工作愉快。

    相关文章

      网友评论

        本文标题:RxJava(2.0)-你可能需要知道这些

        本文链接:https://www.haomeiwen.com/subject/gmqzsxtx.html