一、概念
- 响应式编程:是一种基于异步数据流概念的编程模型。
- 数据流:就像一条河,可以监控水位(被观测),过滤垃圾(过滤),消毒成自来水(操作),可以和其他河流合并(合并)成为一条新的河。
- 异步: 区别如AsyncTask,Rxjava好处在于简洁
- RxJava特点
- 易于并发从而更好的利用服务器的能力。
- 易于有条件的异步执行。
- 避免回调地狱。
- 优雅的链式调用
- 观察者模式适用场景
- 当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。
- 当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。
- 当一个变化的对象通知那些无须推断具体类型的对象时。
- Rxjava的四个角色
- Observable 实体类
- Observer 接口
- Subscriber 抽象类
- Subjects 抽象类。继承自Observable实现了Observer
Observable和Subjects是两个生产实体,Observer和Subscriber是两个消费实体
- 和传统编程方式的理解
Observable就是数据,Observer就是方法(对数据的操作),
Observable.subscriber(Observer)就是方法的调用
二、Observable
可以和iterator对比理解(如果起初不理解Observable,就把它当做是个iterator)
Event | Iterable(pull) | Observable(push) |
---|---|---|
检索数据 | T next() | onNext(T) |
发现错误 | throws Exception | onError(Throwable) |
完成 | !hasNext() | onComplete() |
- Observale三个回调
- onNext():检索数据
- onComplete(): 完成
- onError(): 发现错误
- Observable的分类
从发射物的角度
- “热”Observable : 一个“热”的Observable典型的只要一创建就开始发射数据,因此所有后续订阅它的观察者可能从序列的中间的某个位置开始接受数据(有一些数据错过了)。
- “冷”Observable :一直等待,直到有观察者订阅它才开始发射数据
- Observable的创建
- Observable.create()
- Observable.from() :将集合中的数据一个一个的发射出来。
- Observable.just() :可以有1到9个参数,按顺序发射。参数可以是列表或数组,不同于from(),会发射整个列表。
- Observable的一些特殊方法
- Observable.empty() 毫无理由的不再发射数据并正常结束。
- Observable.never() 不发射数据并且永远不会结束。
- Observable.throw() 不发射数据并且以错误结束。
三、Subject
Subject = Observable + Observer
- Rxjava提供的四种Subjec
- PublishSubject : subject的基础子类。
- BehaviorSubject : 会首先向它的订阅者发送截止订阅前最新的一个数据,然后正常发送订阅后的数据流。
- ReplaySubject : 会缓存它所订阅的所有数据,向所有订阅它的观察者重发。
- AsyncSubject : 只会发布最后一个数据给已经订阅的每一个观察者。
四、线程控制Schedulers
- StrickMode
- 作用:帮助我们侦测敏感活动,如在主线程执行耗时操作。当出现违规操作时,penaltyLog()会在logcat打印一条信息。
- 设置
@Override
public void onCreate() {
super.onCreate();
if (BuildConfig.DEBUG) {
StrictMode.setThreadPolicy(new StrictMode.ThreadPolicy.Builder().detectAll().penaltyLog().build());
StrictMode.setVmPolicy(new StrictMode.VmPolicy.Builder().detectAll().penaltyLog().build());
}
}
- RxJava提供的五种调度器
- .io() : 用于 I/O操作
- .computation() :计算相关的默认调度器(buffer(),debounce(),delay(),interval(),sample(),skip()),与I/O操作无关。
- .immediate() :立即在当前线程执行你指定的工作。timeout(),timeInterval(),以及timestamp()方法默认调度器。
- .newThread():启动新线程。
- .trampoline() :非立即执行,加入队列按序执行。repeat(),retry()方法的默认调度器。
- subscribeOn()和ObserveOn()
- subscribeOn()指定Observable所在的线程。
- observerOn()指定Observer所在的线程
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
在io线程操作数据,在主线程更新UI
五、需要注意的事情
- 背压
事件产生的速度比消费快。发生 overproucing 后,当链式结构不能承受数据压力时,就会抛出 MissingBackpressureException异常。
onBackpressureBuffer
onBackpressureDrop
六、操作符列表
官方文档列出的操作符,共393个。同学们奋斗吧……,接下来的博客会具体介绍
Aggregate
**All
**
**Amb
**
and_
**And
**
Any
apply
as_blocking
AsObservable
AssertEqual
asyncAction
asyncFunc
**Average
**
averageDouble
averageFloat
averageInteger
averageLong
blocking
**Buffer
**
bufferWithCount
bufferWithTime
bufferWithTimeOrCount
byLine
cache
case
Cast
**Catch
**
catchException
collect
collect
(RxScala version of **Filter
**)
**CombineLatest
**
combineLatestWith
**Concat
**
concat_all
concatMap
concatMapObserver
concatAll
concatWith
**Connect
**
connect_forever
cons
**Contains
**
controlled
**Count
**
countLong
**Create
**
cycle
**Debounce
**
decode
**DefaultIfEmpty
**
**Defer
**
deferFuture
**Delay
**
delaySubscription
delayWithSelector
**Dematerialize
**
**Distinct
**
DistinctUntilChanged
**Do
**
doAction
doOnCompleted
doOnEach
doOnError
doOnRequest
doOnSubscribe
doOnTerminate
doOnUnsubscribe
doseq
doWhile
drop
dropRight
dropUntil
dropWhile
**ElementAt
**
ElementAtOrDefault
**Empty
**
empty?
encode
ensures
error
every
exclusive
exists
expand
failWith
**Filter
**
filterNot
Finally
finallyAction
finallyDo
find
findIndex
**First
**
FirstOrDefault
firstOrElse
**FlatMap
**
flatMapFirst
flatMapIterable
flatMapIterableWith
flatMapLatest
flatMapObserver
flatMapWith
flatMapWithMaxConcurrent
flat_map_with_index
flatten
flattenDelayError
foldl
foldLeft
for
forall
ForEach
forEachFuture
forIn
forkJoin
**From
**
fromAction
fromArray
FromAsyncPattern
fromCallable
fromCallback
FromEvent
FromEventPattern
fromFunc0
from_future
from_iterable
from_list
fromNodeCallback
fromPromise
fromRunnable
Generate
generateWithAbsoluteTime
generateWithRelativeTime
generator
GetEnumerator
getIterator
**GroupBy
**
GroupByUntil
GroupJoin
head
headOption
headOrElse
if
ifThen
**IgnoreElements
**
indexOf
interleave
interpose
**Interval
**
into
isEmpty
items
**Join
**
join
(string)
jortSort
jortSortUntil
**Just
**
keep
keep-indexed
**Last
**
lastOption
LastOrDefault
lastOrElse
Latest
latest
(Rx.rb version of **Switch
**)
length
let
letBind
limit
LongCount
ManySelect
**Map
**
map
(RxClojure version of **Zip
**)
MapCat
mapCat
(RxClojure version of **Zip
**)
map-indexed
map_with_index
**Materialize
**
**Max
**
MaxBy
**Merge
**
mergeAll
merge_concurrent
mergeDelayError
mergeObservable
mergeWith
**Min
**
MinBy
MostRecent
Multicast
nest
**Never
**
Next
Next
(BlockingObservable version)
none
nonEmpty
nth
**ObserveOn
**
ObserveOnDispatcher
observeSingleOn
of
of_array
ofArrayChanges
of_enumerable
of_enumerator
ofObjectChanges
OfType
ofWithScheduler
onBackpressureBlock
onBackpressureBuffer
onBackpressureDrop
OnErrorResumeNext
onErrorReturn
onExceptionResumeNext
orElse
pairs
pairwise
partition
partition-all
pausable
pausableBuffered
pluck
product
**Publish
**
PublishLast
publish_synchronized
publishValue
raise_error
**Range
**
**Reduce
**
reductions
**RefCount
**
**Repeat
**
repeat_infinitely
repeatWhen
**Replay
**
rescue_error
rest
**Retry
**
retry_infinitely
retryWhen
Return
returnElement
returnValue
runAsync
**Sample
**
**Scan
**
scope
Select
(alternate name of **Map
**)
select
(alternate name of **Filter
**)
selectConcat
selectConcatObserver
SelectMany
selectManyObserver
select_switch
selectSwitch
selectSwitchFirst
selectWithMaxConcurrent
select_with_index
seq
**SequenceEqual
**
sequence_eql?
SequenceEqualWith
**Serialize
**
share
shareReplay
shareValue
Single
SingleOrDefault
singleOption
singleOrElse
size
**Skip
**
**SkipLast
**
skipLastWithTime
**SkipUntil
**
skipUntilWithTime
**SkipWhile
**
skip_while_with_index
skip_with_time
slice
sliding
slidingBuffer
some
sort
sort-by
sorted-list-by
split
split-with
**Start
**
startAsync
startFuture
**StartWith
**
stringConcat
stopAndWait
subscribe
**SubscribeOn
**
SubscribeOnDispatcher
subscribeOnCompleted
subscribeOnError
subscribeOnNext
**Sum
**
sumDouble
sumFloat
sumInteger
sumLong
**Switch
**
switchCase
switchIfEmpty
switchLatest
switchMap
switchOnNext
Synchronize
**Take
**
take_with_time
takeFirst
**TakeLast
**
takeLastBuffer
takeLastBufferWithTime
takeLastWithTime
takeRight
(see also: **TakeLast
**)
**TakeUntil
**
takeUntilWithTime
**TakeWhile
**
take_while_with_index
tail
tap
tapOnCompleted
tapOnError
tapOnNext
**Then
**
thenDo
Throttle
throttleFirst
throttleLast
throttleWithSelector
throttleWithTimeout
**Throw
**
throwError
throwException
**TimeInterval
**
**Timeout
**
timeoutWithSelector
**Timer
**
**Timestamp
**
**To
**
to_a
ToArray
ToAsync
toBlocking
toBuffer
to_dict
ToDictionary
ToEnumerable
ToEvent
ToEventPattern
ToFuture
to_h
toIndexedSeq
toIterable
toIterator
ToList
ToLookup
toMap
toMultiMap
ToObservable
toSet
toSortedList
toStream
ToTask
toTraversable
toVector
tumbling
tumblingBuffer
unsubscribeOn
**Using
**
**When
**
Where
while
whileDo
**Window
**
windowWithCount
windowWithTime
windowWithTimeOrCount
windowed
withFilter
withLatestFrom
**Zip
**
zipArray
zipWith
zipWithIndex
++
+:
:+
网友评论