Rxjava到底是什么
一个词:异步
一个可以在java VM上使用可观测的序列来组成异步的、基本事件的程序库
一个实现异步操作的库
RXJava优缺点
简洁
随着程序的逻辑变得越来越复杂,它依然能够保持简洁。
API介绍和原理解析
1.概念:扩展的观察者模式
RXjava的异步实现,是通过一种扩展的观察者模式
观察者模式
观察者模式的面向需求是:对象A(观察者)对对象B(被观察者)的某种变化,高度敏感,需要在B变化的一瞬间做出反应。
观察者模式采用注册(register)或者成为订阅(subscrible)的方式,告诉被观察者,我需要你的某某状态,你要在它变化的时候告诉我。
Android开发中典型的例子就是view的点击监听器OnClickLinstener()。对设置onClickListener来说,view是被观察者,OnClickListener是观察者,二者通过setOnClickListener完成订阅关系。订阅完成之后,用户点击view的瞬间,Android Framework就会将点击事件交给已经注册的onClickListener采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。
OnClickListener 的模式大致如下图:
[图片上传失败...(image-2917dc-1512961567324)]
如图所示,通过setOnClickListener方法Button持有OnClickListener的引用,当用户点击Button,自动调用OnclickListener里的onClick方法。
把图片抽象出来 Button->被观察者 OnClickListener->观察者 setOnClickListener->订阅 onClick->事件。就由专用的观察者模式(例如只用于监听控件点击)转变成了通用的观察者模式。
[图片上传失败...(image-ed19bb-1512961567324)]
而 RxJava 作为一个工具库,使用的就是通用形式的观察者模式。
RXJava的观察者模式
RXjava有4个感念:
- Observable 被观察者
- Observer 观察者
- subscribe 订阅
- 事件
Observable和Observer通过Subscribe方法实现订阅,从而Observable可以在需要的时候发出事件通知Observer。
与传统的观察者模式不同,除了普通事件onNext() (相当于onClick/OnEvent),还定义两个特殊的事件onCompleted(),onError | completed 完成 完整的|
- onComplete() 事件队列完结。RXJava不仅把每个事件单独处理,还会把他们看成一个队列RXJava规定,如果没有新的onNext()方法发出时,必须出发onCompleted方法作为完成标志。
- onError() 事件队列异常,在事件处理过程中出现异常,会触发onError(),并且整个事件终止,不允许在有事件发出。
- 在一个正确运行事件序列中,onCompleted,onError有且只有一个会被调用,而且是事件中最后一个方法,两个方法是互斥的。即在队列中调用了其中一个,就不应该再调用另一个。
RxJava 的观察者模式大致如下图:
[图片上传失败...(image-f381dc-1512961567324)]
基本实现
基于以上的概念, RxJava 的基本实现主要有三点:
创建Observer 观察者
决定着事件触发将有怎样的行为
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
不仅基本使用方式一样,实质上,在RxJava的subscribe过程中,Observer也总是会先被转换成Subscriber在使用。所以使用基本功能,选择Observer或者subscriber都是一样的。他们的区别有两点。
- onStart() 这是subscriber新增方法,他会在subscribe刚开始,但是事件还没有发送之前被调用,可以用于做一些准备工作,例如数据清零或者重置,这是一个可选的方法。默认实现为空。但是如果对工作线程有要求的话(例如弹出一个对话框,需要在Ui线程执行),就不能使用onStart(),因为他总是调用在subscribe所发生的线程调用,而不能指定线程。如果指定线程来做准备工作,可以使用doOnSubscribe()方法。
- unSubscribe 这是Subscriber所实现的另一个接口Subscription()方法,用于取消订阅,在这个方法调用后Subscriber将不接受任何事件。一般在调用之前先使用isUnSubscribed先判断一下状态,unSubscribe()这个方法很重要,因为在subscribe之后,Observable会持有Subscriber的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以要保持良好的原则,要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
创建Observeable
Observable是被观察者,决定在什么时候被触发,和触发什么事件。
RXJava使用create()方法来创建一个Observable,并为他设置事件触发规则。
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
这里传入OnSubScribe对象作为参数,OnSubscribe会被存入染回的Observable对象中,相当于一个计划表,当OnSubscribeObservable被订阅时,OnSubscriable的call方法会被自动调用,事件序列会按照设定依次调用onNext方法和OnCompleted方法,这样,由被观察者调用了观察者的回调方法,就实现了被观察者向观察者的事件传递,即观察者模式。
create方法是RXJava中最基本的创造事件序列的方法。RXJava还提供了一些方法用来快捷创建事件队列,例如:
- just<T...> 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
- from(T...) / from(Iterable<? extends T> 将传入的数组 或者 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。
Subcsribe订阅
创建了Observable和Observe之后,用subscribe方法将他们链接起来,整条链子就可以工作了。
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
subscribe做了三件事
- 调用的Subscriber的onStart方法,可选的准备方法。
- 调用observable中的Call方法。在这里,事件发送的逻辑开始运行。在RXjava中Observable不是在创建时候就立即发送事件,而是在他订阅的时候,即放subscribe执行的时候。
- 将传入的subscribe作为Subscription返回,为了方便unSubscribe。
整个关系如下
[图片上传失败...(image-17e6e0-1512961567324
或者
[图片上传失败...(image-7845b8-1512961567324)]
除了subscribe(Observer) 或者 subscribe(subscriable),subscribe还支持不完整定义的回调,RXJava会自动创建出Subscriber
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Action0是RxJava中的一个接口,它只有一个方法Call,这个方法是一个无参无返回值的方法,由于onCompleted也是无参无返回值的,因此action可以当成一个包装对象,将onCompleted内容打包起来将自己作为一个参数传入subscribe中,以实现不完整定义的回调。也可以看做将onCompleted方法传递进了subscribe,相当于某些语言中的闭包。
Action1也是一个接口,他同样也只有一个方法Call(T param),这个方法无返回值,但是有一个参数,与Action0同理,由于onNext onError也是只有一个单参数,且没有返回值,因此Action1可以将OnNext(obj)和onError(error)打包起来传入subscribe中,以实现不完整定义的回调,事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。
场景事例
打印字符数组
将字符串数组 names 中的所有字符串依次打印出来:
String[] names = {"冯星","曹操","赵云","马超"};
rx.Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "call: " +s);
}
});
由 id 取得图片并显示
由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错:
Observable<Drawable> observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
subscriber.onNext(getResources().getDrawable(R.mipmap.water));
subscriber.onCompleted();
}
});
observable.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
iv_demo.setImageDrawable(drawable);
}
});
正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。
[图片上传失败...(image-9074f5-1512961567324)]
在RXjava默认规则里,事件的发出和消费都在同一个线程里。也就是说上面是一个同步的观察者模式。
而观察者模式本身的目的在于 后台处理,前台调用 的异步机制。因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。
线程控制 -- Scheduler |si gan diu le| 调度
在不指定线程的情况下,RXjava遵循的是线程不变得原则,
即,在那个线程调用Subscribe,就在哪个线程生产事件;在哪个线程生产的事件,就在那个线程消费事件,
如果需要切换线程,就需要用到 Scheduler (调度器)。
Schedule的API
在RXjava中,schedule--调度器,相当于线程控制器,RXJava通过它来指定每一段代码应该运行哪一个线程。
- Schedulers.immediate() 运行在当前线程,相当于不指定线程。这是默认的schedule。 |ai mi dei rui te| 立即的 立刻的
- Schedulers.newThread() 总是启用新线程。并在新线程执行操作。
- Schedulers.io() io操作(读写文件,读写数据库,网络信息交互等)所使用的schedule。行为模式和newThread差不多。区别在于Io的内部实现是用一个无数量上线的线程池,可以重复利用闲置的线程。因此多数情况下,io要比newThread更有效。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation() 计算时使用的schedule。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形计算。这个Scheduler使用固定的线程池,大小为CPU的核数,不要把I/O放在computation中,否则I/O操作的等待时间会浪费CPU。
- AndroidSchedules.mainThread() Android专用线程,他指定的操作将在主线程中运行。
subscribeOn() 指定subscribe()所发生的线程,即Observable.OnSubscribe()被激活时所发生的线程。或者叫做事件产生的线程。
ObserveOn()指定subscriber所发生的线程或者叫做事件消费的线程。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
由于subscribeOn(Schedulers.io())的指定,被创建的事件1,2,3,4,将会在在Io线程发出。
由于observeOn(AndroidSchedulers.mainThread())的指定,因此subscriber的数字打印将发生在主线程。
事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
Schedule的原理
下面呢
变换
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一
所谓变换,就是将事件序列中的对象或者整个序列进行加工处理,转换成不同的事件或者事件序列。
API
map()
Observable.just(R.mipmap.water)
.map(new Func1<Integer, Bitmap>() {
@Override
public Bitmap call(Integer s) {
return BitmapFactory.decodeResource(getResources(),s);
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
iv_demo.setImageBitmap(bitmap);
}
});
Func1和Action1非常相似。也是RXJava中的一个接口。用于包装有一个参数的方法。Func1和Action1的区别在于,Func1是由返回值得。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
map()方法将参数中的string对象转换成BitMap对象后返回,经过map方法后,事件的参数也随之变成的bitmap
- map():事件对象的直接变换,是最常见的转换。
[图片上传失败...(image-2a65d2-1512961567324)]
- flatMap() 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "call: "+s);
}
});
很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:
Observable.from(students)
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
List<Course> courses = student.getCourses();
for(Course course : courses){
Log.d(TAG, "onNext: "+ course.getName() + " Student : " +student.getName());
}
}
});
依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢? 这时候就需要flatMap了
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
}).subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
Log.d(TAG, "call: " + course.getName());
}
});
flatMap和map有个相同点:也是把传入的参数转化之后返回另一个对象。但是需要注意的是flatMap返回的对象是Observable对象。并且这个Observable对象不是直接发送到了Subscriber的回调方法中。
flatMap的原理是这样的
- 使用传入的事件对象创建一个Observable对象。
- 并不发送这个Observable对象,而是将它激活,于是它开始发送事件。
- 每一个创建出来的Observable发送的事件,都汇入同一个Observable对象,而这个observable对象负责将这些事件传入Subscriber对象。
这三个步骤吧事件分成了两级,通过一组新创建的Observable将初始的对象铺平,之后通过统一路径分发下去,而这个『铺平』就是 flatMap() 所谓的 flat
flatMap()示意图
[图片上传失败...(image-a79136-1512961567324)]
扩展
由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 处理显示消息列表
showMessages(messages);
}
});
传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。
- throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器: RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。 |si rou te| 喉咙 压制 节流 减速 窒息
变换的原理 lift()
这些变化虽然功能不一样,但实质上都是针对事件的处理在发送。而在RXjava的内部,他们都是基于同一个基础的方法变化,lift(Operator)。
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber); // 这个onSubscribe是原始的OnSubScribe对象!!
}
});
}
这段代码,它生成了一个新的Observable并且返回,新创建的Observable中的参数OnSubscribe的回调方法call()的实现,和Observable.Subscribe()基本一样,但是是由区别的。
不一样的地方在与OnSubscrvable中call(subscribe)所指代了对象不同
- 当使用lift方法时,
- 假设有一个Observable<T>调用了lift()并创建Observable后,一共有个Observable。
- 同样的Observable的参数OnSubscribe,加上之前原始的Observable里面的原始OnSubscribe,也就有了两个 OnSubscribe;
- 然后调用Observable.create传入Observable<R>,触发onSubscribe的Call方法,也是就override的方法,
- 在该方法中 调用了OnSubscribe.call()方法,注意:这个OnSubscribe方法是原始的Observable<T>的onSubscribe<T>对象。他需要传入一个Subscriber对象,这个对象是通过Subscriber newSubscriber = operator.call(subscriber);operator.call()方法生成的新的Subscribe。正是这个operator对象将两个Subscriber对象联系起来的。OnSubscribe<T>在执行Subscriber<R>.onNext(R r),而这里从T变成R,正好用到了传到Operator中的参数Func1<T, R>。
这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
也可以这个说:在Observable执行了lift(Operator)方法后,会返回一个新的Observable,这个新的Observable会象一个代理一样,负责接受原始的Observable发出的事件,并在处理后发送给Subscriber
[图片上传失败...(image-8fc12c-1512961567324)]
[图片上传失败...(image-925ff1-1512961567324)]
多次调用
[图片上传失败...(image-eb30f8-151296156732
举个例子
Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
.map(new Func1<Float, Integer>() {
@Override
public Integer call(Float aFloat) {
return Math.round(aFloat);
}
})
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return Integer.toBinaryString(integer);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
log("2 map onNext->" + s);
}
});
// outputs
// 2 map onNext->1
// 2 map onNext->1000
// 2 map onNext->11111111111111111111110111101010
// 2 map onNext->110001001
该例子是一个Float->Integer->String的转换。我们按上面的流程来分析。
- 生成一个Observable<Float>
- 调用map生成Observable<Integer>
- 调用map生成Observable<String>
- subscribe()传入一个Subscribe(String),至此流的前半部分全部完成。
- 执行开始,Subscribe<String>发送事件,先生成一个Subscrver<Integer>传给Observable<Integer>(Observable<Integer>.onSubscribe.call())。
- Observable<Interger>开始发送事件,同样生成一个Subscriber<Float>传给Observable<Float>(Observable<Float>.onSubscribe.call())。
- 真正的发送事件开始,Observable<Float>调用Subscriber<Float>.onNext(Float)等方法,同时Subscriber<Integer>.onNext(Integer)被调用,同时Subscriber<String>.onNext(String)被调用,事件发送完成。
compose对Observable整体的变换 |com pou si| 构成 组成
除了lift方法外,Observable还有一个变换方法叫 compose(Transformer)它和lift的区别在于lift是针对事件项和事件序列,而compose是针对observable自身进行变换。
假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。
Scheduler的API(二)
利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。
能不能可以多次切换线程
答案是能。因为ObserveOn指定的是
操作符分类
面我按类别把常用操作符分别介绍,其实很多内容都是来自于ReactiveX的官方网站,英文比较好的朋友可以参考(http://reactivex.io/)。
按照官方的分类,操作符大致分为以下几种:
Creating Observables(Observable的创建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
Transforming Observables(Observable的转换操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
Filtering Observables(Observable的过滤操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
Combining Observables(Observable的组合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
Error Handling Operators(Observable的错误处理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
Conditional and Boolean Operators(Observable的条件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
Mathematical and Aggregate Operators(Observable数学运算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
其他如observable.toList()、observable.connect()、observable.publish()等等;
网友评论