rxjava

作者: Android_冯星 | 来源:发表于2017-12-11 11:06 被阅读21次

    Rxjava到底是什么

    一个词:异步
    一个可以在java VM上使用可观测的序列来组成异步的、基本事件的程序库
    一个实现异步操作的库

    RXJava优缺点

    简洁
    随着程序的逻辑变得越来越复杂,它依然能够保持简洁。

    API介绍和原理解析

    1.概念:扩展的观察者模式

    RXjava的异步实现,是通过一种扩展的观察者模式

    观察者模式

    观察者模式的面向需求是:对象A(观察者)对对象B(被观察者)的某种变化,高度敏感,需要在B变化的一瞬间做出反应。
    观察者模式采用注册(register)或者成为订阅(subscrible)的方式,告诉被观察者,我需要你的某某状态,你要在它变化的时候告诉我。
    Android开发中典型的例子就是view的点击监听器OnClickLinstener()。对设置onClickListener来说,view是被观察者,OnClickListener是观察者,二者通过setOnClickListener完成订阅关系。订阅完成之后,用户点击view的瞬间,Android Framework就会将点击事件交给已经注册的onClickListener采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。

    OnClickListener 的模式大致如下图:

    [图片上传失败...(image-2917dc-1512961567324)]

    如图所示,通过setOnClickListener方法Button持有OnClickListener的引用,当用户点击Button,自动调用OnclickListener里的onClick方法。
    把图片抽象出来 Button->被观察者 OnClickListener->观察者 setOnClickListener->订阅 onClick->事件。就由专用的观察者模式(例如只用于监听控件点击)转变成了通用的观察者模式。

    [图片上传失败...(image-ed19bb-1512961567324)]

    而 RxJava 作为一个工具库,使用的就是通用形式的观察者模式。

    RXJava的观察者模式

    RXjava有4个感念:

    • Observable 被观察者
    • Observer 观察者
    • subscribe 订阅
    • 事件

    Observable和Observer通过Subscribe方法实现订阅,从而Observable可以在需要的时候发出事件通知Observer。

    与传统的观察者模式不同,除了普通事件onNext() (相当于onClick/OnEvent),还定义两个特殊的事件onCompleted(),onError | completed 完成 完整的|

    • onComplete() 事件队列完结。RXJava不仅把每个事件单独处理,还会把他们看成一个队列RXJava规定,如果没有新的onNext()方法发出时,必须出发onCompleted方法作为完成标志。
    • onError() 事件队列异常,在事件处理过程中出现异常,会触发onError(),并且整个事件终止,不允许在有事件发出。
    • 在一个正确运行事件序列中,onCompleted,onError有且只有一个会被调用,而且是事件中最后一个方法,两个方法是互斥的。即在队列中调用了其中一个,就不应该再调用另一个。

    RxJava 的观察者模式大致如下图:

    [图片上传失败...(image-f381dc-1512961567324)]

    基本实现

    基于以上的概念, RxJava 的基本实现主要有三点:

    创建Observer 观察者

    决定着事件触发将有怎样的行为

     Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    

    除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    

    不仅基本使用方式一样,实质上,在RxJava的subscribe过程中,Observer也总是会先被转换成Subscriber在使用。所以使用基本功能,选择Observer或者subscriber都是一样的。他们的区别有两点。

    1. onStart() 这是subscriber新增方法,他会在subscribe刚开始,但是事件还没有发送之前被调用,可以用于做一些准备工作,例如数据清零或者重置,这是一个可选的方法。默认实现为空。但是如果对工作线程有要求的话(例如弹出一个对话框,需要在Ui线程执行),就不能使用onStart(),因为他总是调用在subscribe所发生的线程调用,而不能指定线程。如果指定线程来做准备工作,可以使用doOnSubscribe()方法。
    2. unSubscribe 这是Subscriber所实现的另一个接口Subscription()方法,用于取消订阅,在这个方法调用后Subscriber将不接受任何事件。一般在调用之前先使用isUnSubscribed先判断一下状态,unSubscribe()这个方法很重要,因为在subscribe之后,Observable会持有Subscriber的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以要保持良好的原则,要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

    创建Observeable

    Observable是被观察者,决定在什么时候被触发,和触发什么事件。
    RXJava使用create()方法来创建一个Observable,并为他设置事件触发规则。

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Hi");
            subscriber.onNext("Aloha");
            subscriber.onCompleted();
        }
    });
    

    这里传入OnSubScribe对象作为参数,OnSubscribe会被存入染回的Observable对象中,相当于一个计划表,当OnSubscribeObservable被订阅时,OnSubscriable的call方法会被自动调用,事件序列会按照设定依次调用onNext方法和OnCompleted方法,这样,由被观察者调用了观察者的回调方法,就实现了被观察者向观察者的事件传递,即观察者模式

    create方法是RXJava中最基本的创造事件序列的方法。RXJava还提供了一些方法用来快捷创建事件队列,例如:

    • just<T...> 将传入的参数依次发送出来。
    Observable observable = Observable.just("Hello", "Hi", "Aloha");
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    
    • from(T...) / from(Iterable<? extends T> 将传入的数组 或者 Iterable 拆分成具体对象后,依次发送出来。
    String[] words = {"Hello", "Hi", "Aloha"};
    Observable observable = Observable.from(words);
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    

    上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。

    Subcsribe订阅

    创建了Observable和Observe之后,用subscribe方法将他们链接起来,整条链子就可以工作了。

    observable.subscribe(observer);
    // 或者:
    observable.subscribe(subscriber);
    

    Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

    // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }
    

    subscribe做了三件事

    1. 调用的Subscriber的onStart方法,可选的准备方法。
    2. 调用observable中的Call方法。在这里,事件发送的逻辑开始运行。在RXjava中Observable不是在创建时候就立即发送事件,而是在他订阅的时候,即放subscribe执行的时候。
    3. 将传入的subscribe作为Subscription返回,为了方便unSubscribe。

    整个关系如下

    [图片上传失败...(image-17e6e0-1512961567324

    或者

    [图片上传失败...(image-7845b8-1512961567324)]

    除了subscribe(Observer) 或者 subscribe(subscriable),subscribe还支持不完整定义的回调,RXJava会自动创建出Subscriber

    Action1<String> onNextAction = new Action1<String>() {
        // onNext()
        @Override
        public void call(String s) {
            Log.d(tag, s);
        }
    };
    Action1<Throwable> onErrorAction = new Action1<Throwable>() {
        // onError()
        @Override
        public void call(Throwable throwable) {
            // Error handling
        }
    };
    Action0 onCompletedAction = new Action0() {
        // onCompleted()
        @Override
        public void call() {
            Log.d(tag, "completed");
        }
    };
    
    // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
    observable.subscribe(onNextAction);
    // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
    observable.subscribe(onNextAction, onErrorAction);
    // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
    observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
    

    Action0是RxJava中的一个接口,它只有一个方法Call,这个方法是一个无参无返回值的方法,由于onCompleted也是无参无返回值的,因此action可以当成一个包装对象,将onCompleted内容打包起来将自己作为一个参数传入subscribe中,以实现不完整定义的回调。也可以看做将onCompleted方法传递进了subscribe,相当于某些语言中的闭包。

    Action1也是一个接口,他同样也只有一个方法Call(T param),这个方法无返回值,但是有一个参数,与Action0同理,由于onNext onError也是只有一个单参数,且没有返回值,因此Action1可以将OnNext(obj)和onError(error)打包起来传入subscribe中,以实现不完整定义的回调,事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

    场景事例

    打印字符数组

    将字符串数组 names 中的所有字符串依次打印出来:

     String[] names = {"冯星","曹操","赵云","马超"};
            rx.Observable.from(names).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.d(TAG, "call: " +s);
                }
            });
    

    由 id 取得图片并显示

    由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错:

    Observable<Drawable> observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
                @Override
                public void call(Subscriber<? super Drawable> subscriber) {
                    subscriber.onNext(getResources().getDrawable(R.mipmap.water));
                    subscriber.onCompleted();
                }
            });
            observable.subscribe(new Subscriber<Drawable>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
                    Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_SHORT).show();
                }
    
                @Override
                public void onNext(Drawable drawable) {
                    iv_demo.setImageDrawable(drawable);
                }
            });
    

    正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

    [图片上传失败...(image-9074f5-1512961567324)]

    在RXjava默认规则里,事件的发出和消费都在同一个线程里。也就是说上面是一个同步的观察者模式。

    而观察者模式本身的目的在于 后台处理,前台调用 的异步机制。因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

    线程控制 -- Scheduler |si gan diu le| 调度

    在不指定线程的情况下,RXjava遵循的是线程不变得原则,

    即,在那个线程调用Subscribe,就在哪个线程生产事件;在哪个线程生产的事件,就在那个线程消费事件,

    如果需要切换线程,就需要用到 Scheduler (调度器)。

    Schedule的API

    在RXjava中,schedule--调度器,相当于线程控制器,RXJava通过它来指定每一段代码应该运行哪一个线程。

    • Schedulers.immediate() 运行在当前线程,相当于不指定线程。这是默认的schedule。 |ai mi dei rui te| 立即的 立刻的
    • Schedulers.newThread() 总是启用新线程。并在新线程执行操作。
    • Schedulers.io() io操作(读写文件,读写数据库,网络信息交互等)所使用的schedule。行为模式和newThread差不多。区别在于Io的内部实现是用一个无数量上线的线程池,可以重复利用闲置的线程。因此多数情况下,io要比newThread更有效。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation() 计算时使用的schedule。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形计算。这个Scheduler使用固定的线程池,大小为CPU的核数,不要把I/O放在computation中,否则I/O操作的等待时间会浪费CPU
    • AndroidSchedules.mainThread() Android专用线程,他指定的操作将在主线程中运行。

    subscribeOn() 指定subscribe()所发生的线程,即Observable.OnSubscribe()被激活时所发生的线程。或者叫做事件产生的线程。

    ObserveOn()指定subscriber所发生的线程或者叫做事件消费的线程。

    Observable.just(1, 2, 3, 4)
        .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
        .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.d(tag, "number:" + number);
            }
        });
    

    由于subscribeOn(Schedulers.io())的指定,被创建的事件1,2,3,4,将会在在Io线程发出。
    由于observeOn(AndroidSchedulers.mainThread())的指定,因此subscriber的数字打印将发生在主线程。

    事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

    Schedule的原理

    下面呢

    变换

    RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一

    所谓变换,就是将事件序列中的对象或者整个序列进行加工处理,转换成不同的事件或者事件序列。

    API

    map()

    Observable.just(R.mipmap.water)
                    .map(new Func1<Integer, Bitmap>() {
                        @Override
                        public Bitmap call(Integer s) {
                            return BitmapFactory.decodeResource(getResources(),s);
                        }
                    })
                    .observeOn(Schedulers.io())
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Bitmap>() {
                        @Override
                        public void call(Bitmap bitmap) {
                            iv_demo.setImageBitmap(bitmap);
                        }
                    });
    

    Func1和Action1非常相似。也是RXJava中的一个接口。用于包装有一个参数的方法。Func1和Action1的区别在于,Func1是由返回值得。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

    map()方法将参数中的string对象转换成BitMap对象后返回,经过map方法后,事件的参数也随之变成的bitmap

    • map():事件对象的直接变换,是最常见的转换。

    [图片上传失败...(image-2a65d2-1512961567324)]

    • flatMap() 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
    Observable.from(students)
                    .map(new Func1<Student, String>() {
                        @Override
                        public String call(Student student) {
                            return student.getName();
                        }
                    })
                    .observeOn(Schedulers.io())
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Log.d(TAG, "call: "+s);
                        }
                    });
    

    很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:

     Observable.from(students)
                    .subscribe(new Action1<Student>() {
                        @Override
                        public void call(Student student) {
                            List<Course> courses = student.getCourses();
                            for(Course course : courses){
                                Log.d(TAG, "onNext: "+ course.getName() + "    Student : " +student.getName());
                            }
                        }
                    });
    

    依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢? 这时候就需要flatMap了

    Observable.from(students)
                    .flatMap(new Func1<Student, Observable<Course>>() {
                        @Override
                        public Observable<Course> call(Student student) {
                            return Observable.from(student.getCourses());
                        }
                    }).subscribe(new Action1<Course>() {
                @Override
                public void call(Course course) {
                    Log.d(TAG, "call: " + course.getName());
                }
            });
    

    flatMap和map有个相同点:也是把传入的参数转化之后返回另一个对象。但是需要注意的是flatMap返回的对象是Observable对象。并且这个Observable对象不是直接发送到了Subscriber的回调方法中。

    flatMap的原理是这样的

    1. 使用传入的事件对象创建一个Observable对象。
    2. 并不发送这个Observable对象,而是将它激活,于是它开始发送事件。
    3. 每一个创建出来的Observable发送的事件,都汇入同一个Observable对象,而这个observable对象负责将这些事件传入Subscriber对象。

    这三个步骤吧事件分成了两级,通过一组新创建的Observable将初始的对象铺平,之后通过统一路径分发下去,而这个『铺平』就是 flatMap() 所谓的 flat

    flatMap()示意图

    [图片上传失败...(image-a79136-1512961567324)]

    扩展

    由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

    networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
        .flatMap(new Func1<String, Observable<Messages>>() {
            @Override
            public Observable<Messages> call(String token) {
                // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
                return networkClient.messages();
            }
        })
        .subscribe(new Action1<Messages>() {
            @Override
            public void call(Messages messages) {
                // 处理显示消息列表
                showMessages(messages);
            }
        });
    

    传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

    • throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器: RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。 |si rou te| 喉咙 压制 节流 减速 窒息

    变换的原理 lift()

    这些变化虽然功能不一样,但实质上都是针对事件的处理在发送。而在RXjava的内部,他们都是基于同一个基础的方法变化,lift(Operator)。

    // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                onSubscribe.call(newSubscriber); // 这个onSubscribe是原始的OnSubScribe对象!!
            }
        });
    }
    

    这段代码,它生成了一个新的Observable并且返回,新创建的Observable中的参数OnSubscribe的回调方法call()的实现,和Observable.Subscribe()基本一样,但是是由区别的。

    不一样的地方在与OnSubscrvable中call(subscribe)所指代了对象不同

    • 当使用lift方法时,
    1. 假设有一个Observable<T>调用了lift()并创建Observable后,一共有个Observable。
    2. 同样的Observable的参数OnSubscribe,加上之前原始的Observable里面的原始OnSubscribe,也就有了两个 OnSubscribe;
    3. 然后调用Observable.create传入Observable<R>,触发onSubscribe的Call方法,也是就override的方法,
    4. 在该方法中 调用了OnSubscribe.call()方法,注意:这个OnSubscribe方法是原始的Observable<T>的onSubscribe<T>对象。他需要传入一个Subscriber对象,这个对象是通过Subscriber newSubscriber = operator.call(subscriber);operator.call()方法生成的新的Subscribe。正是这个operator对象将两个Subscriber对象联系起来的。OnSubscribe<T>在执行Subscriber<R>.onNext(R r),而这里从T变成R,正好用到了传到Operator中的参数Func1<T, R>。

    这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

    也可以这个说:在Observable执行了lift(Operator)方法后,会返回一个新的Observable,这个新的Observable会象一个代理一样,负责接受原始的Observable发出的事件,并在处理后发送给Subscriber

    [图片上传失败...(image-8fc12c-1512961567324)]

    [图片上传失败...(image-925ff1-1512961567324)]

    多次调用

    [图片上传失败...(image-eb30f8-151296156732

    举个例子

    Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
            .map(new Func1<Float, Integer>() {
                @Override
                public Integer call(Float aFloat) {
                    return Math.round(aFloat);
                }
            })
            .map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return Integer.toBinaryString(integer);
                }
            })
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    log("2 map onNext->" + s);
                }
            });
    
    // outputs
    // 2 map onNext->1
    // 2 map onNext->1000
    // 2 map onNext->11111111111111111111110111101010
    // 2 map onNext->110001001
    

    该例子是一个Float->Integer->String的转换。我们按上面的流程来分析。

    1. 生成一个Observable<Float>
    2. 调用map生成Observable<Integer>
    3. 调用map生成Observable<String>
    4. subscribe()传入一个Subscribe(String),至此流的前半部分全部完成。
    5. 执行开始,Subscribe<String>发送事件,先生成一个Subscrver<Integer>传给Observable<Integer>(Observable<Integer>.onSubscribe.call())。
    6. Observable<Interger>开始发送事件,同样生成一个Subscriber<Float>传给Observable<Float>(Observable<Float>.onSubscribe.call())。
    7. 真正的发送事件开始,Observable<Float>调用Subscriber<Float>.onNext(Float)等方法,同时Subscriber<Integer>.onNext(Integer)被调用,同时Subscriber<String>.onNext(String)被调用,事件发送完成。

    compose对Observable整体的变换 |com pou si| 构成 组成

    除了lift方法外,Observable还有一个变换方法叫 compose(Transformer)它和lift的区别在于lift是针对事件项和事件序列,而compose是针对observable自身进行变换。

    假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。

    public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
        @Override
        public Observable<String> call(Observable<Integer> observable) {
            return observable
                .lift1()
                .lift2()
                .lift3()
                .lift4();
        }
    }
    ...
    Transformer liftAll = new LiftAllTransformer();
    observable1.compose(liftAll).subscribe(subscriber1);
    observable2.compose(liftAll).subscribe(subscriber2);
    observable3.compose(liftAll).subscribe(subscriber3);
    observable4.compose(liftAll).subscribe(subscriber4);
    

    像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。

    Scheduler的API(二)

    利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。

    能不能可以多次切换线程
    答案是能。因为ObserveOn指定的是

    操作符分类

    面我按类别把常用操作符分别介绍,其实很多内容都是来自于ReactiveX的官方网站,英文比较好的朋友可以参考(http://reactivex.io/)。
    按照官方的分类,操作符大致分为以下几种:

    Creating Observables(Observable的创建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
    Transforming Observables(Observable的转换操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
    Filtering Observables(Observable的过滤操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
    Combining Observables(Observable的组合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
    Error Handling Operators(Observable的错误处理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
    Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
    Conditional and Boolean Operators(Observable的条件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
    Mathematical and Aggregate Operators(Observable数学运算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
    其他如observable.toList()、observable.connect()、observable.publish()等等;

    相关文章

      网友评论

        本文标题:rxjava

        本文链接:https://www.haomeiwen.com/subject/ybxiixtx.html