1. 什么时候使用Flowable,什么时候使用Observable?
下面是官方文档(原文)的直接翻译:
一个小遗憾是,在RxJava 0.x引入背压(backpressure)
时,并没有使用一个独立的基础reactive类,而是直接在Observable的基础上进行改进了。
背压
的主要问题是,许多hot sources比如UI event,它们不能合理地被背压,然后导致我们不想看到的MissingBackpressureException
注:hot sources直译是热来源,其真实表示的是hot Observable,也就是冷热Observable中的热Observable
对于这种情况,我们在2.x中这样尝试补救:将io.reactivex.Observable
设为非背压(non-backpressured)
,同时增加一个新的基础reactive类io.reactivex.Flowable
,而这个类是可背压的(backpressure-enabled)
好消息是2.x的操作符几乎与之前保持一致。坏消息是大家在自动导包(organize imports)时需要注意,不要无意中就选择了不支持背压的io.reactivex.Observable
- 注:项目如果使用了2.x之前的版本的RxJava,即使有些场景需要背压,但当时只能使用
io.reactivex.Observable
;所以当迁移到2.x时,要注意将这部分代码改成使用io.reactivex.Flowable
,因为前者在 2.x时不支持背压。
什么时候使用Observable?
- 当你的数据流最多也不会超过1000个元素(element)时,也就是说一段时间内只有很少的元素发射,所以你的应用不大可能发生内存溢出。
- 当你处理GUI事件,例如鼠标或者触摸事件时,这些事件很难被合理地背压,并且不会频繁的发生。你可以使用Observable去处理频率小于等于1000赫兹的元素发送,并且尽量考虑使用sampling/debouncing 等操作符。
- 本来你的数据流是异步的,但你的平台不支持Java流或者你分不清该使用
Observable
还是Flowable
时,使用Observable
比Flowable
有更小的开销。
什么时候使用Flowable?
- 当处理超过10k的元素,这些元素生成自某处并且具备某些特性,因此数据链(Chain)可以告诉来源(Source)去限制生成量。
- 读取或者解析来自硬盘的文件自然而然地会产生阻塞(blocking),并且是基于拉取式(pull-based)的。在你的控制下,同样可以很好地处理好背压。比如:在特定的请求量下,你会读取多少行的数据。
- 通过JDBC读取数据库同样是基于拉取式并且会产生阻塞,但是你可以通过调用
ResultSet.next()
得到很好的控制,并且通过它,几乎可以应对每一条下流的请求。 - 网络(流)IO:网络请求或者是一些支持请求逻辑量的协议。
- 一些阻塞和/或基于拉取式的数据源,但是未来也许会提供非阻塞响应式的API或者驱动。
2. Consumer,Function
这里说的两个类指的是io.reactivex.functions
包下的:
先说Consumer:
2.x的
Consumer
等于 1.x的Action1
2.x的
BiConsumer
等于 1.x的Action2
2.x的
Consumer<Object[]>
等于 1.x的ActionN
2.x的
Action
等于 1.x的Action0
2.x中 没有 1.x中的
Action3
~Action9
再说Function:
2.x的
Function
等于 1.x的Func
2.x的
BiFunction
等于 1.x的Func2
2.x的
Function3
~Fucntion9
等于 1.x的Func3
~Func9
2.x的
Function<Object[], R>
等于 1.x的FuncN
很明显1.x的命名不太规范,2.x中采用通用的相对合理的命名。
然而2.x命名规范并不是RxJava自己设计的,而是与Java1.8中相同功能的同名类的命名保持一致(不仅是类名,还有方法名)
Java1.8中新增一个包:java.util.function
来大体浏览下这个包(非所有类)
可以看出:Java1.8中没有Action
,Function3
~Function9
。
Consumer的作用
Consumer描述了这样的一种操作(operation):接收一个传入的参数(argument),并且不返回结果(result)。
使用Consumer的目的是,根据传入的值,来做相应的事。所以重点是accept
方法:
* @since 1.8
*/
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
同理:BiConsumer是接收两个传入的参数,并且不返回结果
* @since 1.8
*/
@FunctionalInterface
public interface BiConsumer<T, U> {
/**
* Performs this operation on the given arguments.
*
* @param t the first input argument
* @param u the second input argument
*/
void accept(T t, U u);
上面2个源码是Java1.8的,因为注释比较详细。。。
下面看下RxJava2.x的:
/**
* A functional interface (callback) that accepts a single value.
* @param <T> the value type
*/
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
/**
* A functional interface (callback) that accepts two values (of possibly different types).
* @param <T1> the first value type
* @param <T2> the second value type
*/
public interface BiConsumer<T1, T2> {
/**
* Performs an operation on the given values.
* @param t1 the first value
* @param t2 the second value
* @throws Exception on error
*/
void accept(T1 t1, T2 t2) throws Exception;
}
/**
* A functional interface similar to Runnable but allows throwing a checked exception.
*/
public interface Action {
/**
* Runs the action and optionally throws a checked exception.
* @throws Exception if the implementation wishes to throw a checked exception
*/
void run() throws Exception;
}
对于Consumer,RxJava2.x与Java1.8最大的区别是:
accept
方法默认都会抛出Exception
,这也是2.x新加入的特性。
(另外稍微提下,后者比前者多了一个andThen
方法。)
Function的作用
Function描述了这样的一种功能(function):接收一个参数(argument),并且产生一个结果(result)。
使用Function的目的是,根据传入的值,来输出一个值。所以重点是apply
方法:
* @since 1.8
*/
@FunctionalInterface
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
同理:BiFunction是接收两个参数,并且产生一个结果
* @see Function
* @since 1.8
*/
@FunctionalInterface
public interface BiFunction<T, U, R> {
/**
* Applies this function to the given arguments.
*
* @param t the first function argument
* @param u the second function argument
* @return the function result
*/
R apply(T t, U u);
同样也比较下Rxjava2.x的:
/**
* A functional interface that takes a value and returns another value, possibly with a
* different type and allows throwing a checked exception.
*
* @param <T> the input value type
* @param <R> the output value type
*/
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
/**
* A functional interface (callback) that computes a value based on multiple input values.
* @param <T1> the first value type
* @param <T2> the second value type
* @param <R> the result type
*/
public interface BiFunction<T1, T2, R> {
/**
* Calculate a value based on the input values.
* @param t1 the first value
* @param t2 the second value
* @return the result value
* @throws Exception on error
*/
@NonNull
R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
/**
* A functional interface (callback) that computes a value based on multiple input values.
* @param <T1> the first value type
* @param <T2> the second value type
* @param <T3> the third value type
* @param <R> the result type
*/
public interface Function3<T1, T2, T3, R> {
/**
* Calculate a value based on the input values.
* @param t1 the first value
* @param t2 the second value
* @param t3 the third value
* @return the result value
* @throws Exception on error
*/
@NonNull
R apply(@NonNull T1 t1, @NonNull T2 t2, @NonNull T3 t3) throws Exception;
}
同样,对于Function,RxJava2.x与Java1.8最大的区别是:
apply
方法默认都会抛出Exception
,这也是2.x新加入的特性。
(另外稍微提下,后者比前者多了一个andThen
方法。)
最后,大家可能有疑问:
BiConsumer
和BiFcuntion
中Bi
是什么意思?
答案就是:Binary
。当然这里不是"二进制"的意思,而是"两"个参数的意思。
3. 观察者和被观察者
来看段经典的设计模式书籍---Head First怎么说?
出版者 + 订阅者 = 观察者模式
如果你了解报纸的订阅是怎么回事,其实就知道观察者模式是怎么回事,只是名称不太一样:出版者改称为“主题”(Subject),订阅者改称为“观察者”(Observer)
所以上面出现四个关键字:
Publisher(出版者) -> Subject(主题)
Subscriber(订阅者) -> Observer(观察者)
比较:Observable,ObservableSource,Flowable,Publisher
2.x 和 1.x中Observable
的父类都是Object
,
但前者额外实现了ObservableSource
接口,里面只有一个方法:
void subscribe(@NonNull Observer<? super T> observer)
两者在2.x 和 1.x 的包名分别为:io.reactivex
和 rx
。
与2.x的Observable
相似,Flowbale
也实现了一个接口,
该接口就是Publisher
! 同样只有一个方法:
void subscribe(Subscriber<? super T> s);
需要注意的是,Publisher
并没有在io.reactivex
包内,而是在org.reactivestreams
内,该包内只有4个类,但每个都非常重要:
比较:Observer,Subscriber,Subscription,Disposable
先看下1.x的Observable
的subscribe()
方法:
如上图所示:每个方法subscribe()
的方法都返回Subscription
对象
Subscription
是一个接口,就2个方法:unsubscribe()
和isUnsubscribed()
然后看下Observer
,同样是一个接口,就那3个最常用的方法:onNext()
,onComplete()
,onError()
Subscriber
刚好同时实现了Observer
和Subscription
:
public abstract class Subscriber<T> implements Observer<T>, Subscription
虽然Observable
在subscribe()
时可以传入Observer
,但实际处理时,会先把Observer
转为Subscriber
:
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
上面是对于1.x的Observer
,Subscription
,Subscriber
的简单理解,前两者是接口,最后一个是抽象类,并且都在rx
包内。
来看下2.x,先是Observable
的subscribe()
方法:
与1.x的不同点:
返回值是
Disposable
而不再是Subscription
。
当subscribe
的是Observer
时,并没有返回值。
并且不能1.x那样subscribe
一个Subscriber
。
先看下Disposable
,只有两个方法:dispose()
和isDisposed()
。看到这里应该会有下意识的反应:Disposable
与1.x的Subscription
如出一辙!很明显,它确实是这个作用。
然后我们来看下Observer
,它仍然是一个接口,但比1.x多出一个方法:
void onSubscribe(@NonNull Disposable d);
到此,就可以解释为什么当Observable
subscribe
的是Observer
时没有返回值,因为Observer
内部的方法已经提供了Disposable
的引用。
同时,还由于Disposable
与1.x的Subscription
的作用相同,而1.x的Subscriber
实现了Observer
和Subscription
,所以实际上,2.x的Observer
扮演了1.x中Subscriber
的角色!
下面再看下Flowable
:
如上图所示:
Flowable
的前5个方法同Observable
的一致,返回值都Disposable
。
但是Flowable
只能subscribe
的是Subscriber
,而非Observer
。
(FlowableSubscriber
是Subscriber
的子类,也是个接口,两者区别是前者的onSubscribe
方法不可以传空对象,后者可以),
所以我们重点看下2.x的Subscriber
:
2.x的Subscriber
是个接口(1.x是抽象类),与2.x的Observer
非常相似,除了onNext
,onComplete
,onError
外,还有:
void onSubscribe(Subscription s);
这里又出现了Subscription
,同样有2个方法,但是不同于1.x的unsubscribe()
和isUnsubscribed()
,以及2.xDisposable
的dispose()
和isDisposed()
,它的两个方法是:
/**
* No events will be sent by a {@link Publisher} until demand is signaled via this method.
*
* It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE.
* An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded".
*
* Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
*
* A {@link Publisher} can send less than is requested if the stream ends but
* then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}.
*
* @param n the strictly positive number of elements to requests to the upstream {@link Publisher}
*/
void request(long n);
和
/**
* Request the {@link Publisher} to stop sending data and clean up resources.
*
* Data may still be sent to meet previously signalled demand after calling cancel.
*/
void cancel();
request()
的重要性及作用就不多说了,说下cancel()
:
Disposable
的dispose()
和1.x Subscription
的unsubscribe()
仅仅是不再接收上流数据,并不影响上流数据的发送。
而2.x Subscription
中cancel()
所做的是,直接让上流来源停止发送数据,并且清空数据。
最后还有小注意点:
2.x的
Observable
和Flowable
当subscribe
的都是Consumer
的时候,两者的返回值都是Disposable
。而2.x的
Observer
和Subscriber
的区别只有onSubscribe
方法(都有onNext
,onComplete
,onError
三个方法),且两者onSubscribe
方法的区别只是:前者的接收的参数是Disposable
,而后者是Subscription
。也就是说Subscription
只有在使用Subscriber
时才会用到,而Subscriber
只有在使用Flowable
时才会用到。
所以Subscription
只有在使用Flowable
时才会用到。
另外:2.xSubscriber
和Subscription
并不在io.reactivex
内,
而是在org.reactivestreams
中(就是介绍Publisher
时贴出的4个类)。
比较:Subject
这个就简单了,1.x 中的Subject
与 2.x 中的Subject
所用是一致的:
Represents an Observer and an Observable at the same time
但由于2.x中加入了Flowable
,也就意味着2.x中的Subject
的覆盖范围没有1.x中那么广。两者分别位于2.x 和 1.x 的包:io.reactivex.subjects
和 rx.subjects
。
4. 操作符的决策树
源自:A Decision Tree of Observable Operators
我想创建一个
Observable
只需要发射一个item:Just
网友评论