RxJava的另一个好处在于,我们可以清楚地看到数据是如何在一系列操作符之间进行转换的。
ReactiveX/RxJava文档中文版
可观察对象(Observables)
观察者(observers)
桥梁或者代理(Subject)
AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
串行化
如果你把
Subject
当作一个Subscriber
使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。
要避免此类问题,你可以将 Subject
转换为一个 SerializedSubject
,类似于这样:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
调度器 (Scheduler)
RxJava示例
调度器的种类
下表展示了RxJava中可用的调度器种类:
调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用 Schedulers.computation(); |
Schedulers.io( ) | 默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
操作符 (Operators)
- [Creating 创建型]
Create
,Defer
,Empty
/Never
/Throw
,From
,Interval
,Just
,Range
,Repeat
,Start
,Timer
- [Transforming 变换型]
Buffer
,FlatMap
,GroupBy
,Map
,Scan
,Window
- [Filtering 过滤型]
Debounce
,Distinct
,ElementAt
,Filter
,First
,IgnoreElements
,Last
,Sample
,Skip
,SkipLast
,Take
,TakeLast
- [Combining 组合型]
And
/Then
/When
,CombineLatest
,Join
,Merge
,StartWith
,Switch
,Zip
- [Error Handling 容错型]
Catch
,Retry
- [Utility 辅助操作 工具型]
Delay
,Do
,Materialize
/Dematerialize
,ObserveOn
,Serialize
,Subscribe
,SubscribeOn
,TimeInterval
,Timeout
,Timestamp
,Using
- [Conditional and Boolean 条件与布尔操作 条件型]
All
/Amb
/Contains
,DefaultIfEmpty
,SequenceEqual
,SkipUntil
/SkipWhile
,TakeUntil
/TakeWhile
- [Mathematical and Aggregate 算术与聚合操作 聚合型]
Average
/Concat
/Reduce
,Max
/Min
/Count
/Sum
- [Async 异步操作]
Start
,ToAsync
,StartFuture
,FromAction
,FromCallable
,RunAsync
- [Connect 连接操作]
Connect
,Publish
,RefCount
,Replay
- [Convert 转换型]
ToFuture
,ToList
,ToMap
,ToIterable
,toMultiMap
- [Blocking 阻塞操作]
ForEach
,First
,Last
,MostRecent
,Next
,Single
,Latest
- [String 字符串操作]
ByLine
,Decode
,Encode
,From
,Join
,Split
,StringConcat
Transformer
转换器
Observable/Flowable/Single/Completable/Maybe 对象转换成另一个Observable/Flowable/Single/Completable/Maybe 对象
- RxJava1.x 版本就有了Observable.Transformer、Single.Transformer和Completable.Transformer
- RxJava2.x版本中变成了ObservableTransformer、SingleTransformer、CompletableTransformer、FlowableTransformer和MaybeTransformer。其中,FlowableTransformer和MaybeTransformer是新增的。
compose
-
compose()是唯一一个能够从数据流中得到原始Observable<T>的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用compose()来实现。
-
flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap()中创建的Observable起作用,而不会对剩下的流产生影响
-
当创建Observable流的时候,compose()会立即执行,犹如已经提前写好了一个操作符一样,
-
而flatMap()则是在onNext()被调用后执行,onNext()的每一次调用都会触发flatMap(),
-
也就是说,flatMap()转换每一个事件,而compose()转换的是整个数据流。
-
因为每一次调用onNext()后,都不得不新建一个Observable,所以flatMap()的效率较低。事实上,compose()操作符只在主干数据流上执行操作。
网友评论