响应式编程是一种基于异步数据流概念的编程模式。数据就像一条河流:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们更改电子表(变化的传播)中的一些数值时,我们需要更新整个表格或者我们的机器人碰到墙时会拐弯(响应事件)。
今天,响应式编程最通用的一个场景是UI:我们的移动 App 必须做出对网络调用,用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。
Rx 是微软.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。开发者可以使用 Observables 模拟异步数据流。
Rx 让众所周知的概念变得易于实现和消费,例如 push 方法。在响应式的世界里,我们不能假装用户不关注或者是不抱怨它而一味的等待函数的返回结果,网络调用,或者数据库查询的返回结果。我们时刻都在等待着某些东西,这就让我们失去了并行处理其它事情的机会,提供更好的用户体验,让我们的软件免受顺序链的影响,而阻塞编程。
push 方法把这个问题逆转了:取而代之的是不再等待结果,开发者只是简单的请求结果,而当它返回时得到一个通知即可。开发者对即将发生的事件提供一个清晰的响应链。对于每一个事件,开发者都做出相应的响应。(不再等待结果,而是结果返回时通知他。这期间他可以做任何事情)。
RxJava 特点
- 易于并发从而更好的利用服务器的能力。
- 易于有条件的异步执行。
- 一种更好的方式来避免回调地狱。
- 一种响应式方法。
Observables 和 Iterables 共用一个相似的 API:我们在Iterable可以执行的许多操作也可以同样用在Observables上执行。当然,由于Obserables流的本质,没有如Iterable.remove()这样相应的方法。
Pattern | 一个返回值 | 多个返回值 |
---|---|---|
Synchronous | T getData() | Iterable<T> |
Asynchronous | Future<T> getData() | Observable<T> getData |
RxJava世界里的四种角色
- Observable(生产者)
- Observer(消费者)
- Subscriber(消费者)
- Subjects(生产者)
几个用到的通用方法
private Observer<String> getObserver(final String tag){
return new Observer<String>() {
@Override
public void onCompleted() {
System.out.println(tag + ":" +"completed");
}
@Override
public void onError(Throwable e) {
System.out.println(tag + ":" +"error");
}
@Override
public void onNext(String s) {
System.out.println(tag + ":" +s);
}
};
}
private Observer<Object> getObserverObject(final String tag){
return new Observer<Object>() {
@Override
public void onCompleted() {
System.out.println(tag + ":" +"completed");
}
@Override
public void onError(Throwable e) {
System.out.println(tag + ":" +"error");
}
@Override
public void onNext(Object s) {
System.out.println(tag + ":" +s);
}
};
}
Create Observable
Observable.create()
创建一个 Observable
////////////create////////////
Observable<String> createObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(String.valueOf(i));
}
subscriber.onCompleted();
}
});
Subscription subscription = createObservable.subscribe(getObserver("Observable create"));
subscription.unsubscribe();
output
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable create:0
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable create:1
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable create:2
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable create:3
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable create:4
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable create:completed
Observable.from()
从一个列表中不断发送数据。
List<String> list = new ArrayList<>();
list.add(String.valueOf(1));
list.add(String.valueOf(200));
list.add(String.valueOf(300));
list.add(String.valueOf(500));
Observable<String> fromObservable = Observable.from(list);
fromObservable.subscribe(getObserver("Observable from"));
ExecutorService executorService = Executors.newSingleThreadExecutor();
FutureTask<String> future = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return "100001";
}
});
executorService.submit(future);
Observable<String> fromObservableFuture = Observable.from(future, 6, TimeUnit.SECONDS);
fromObservableFuture.subscribe(getObserver("fromObservableFuture"));
output
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable from:1
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable from:200
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable from:300
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable from:500
04-21 08:16:30.525 6468-6468/com.zsy.androidtraining I/System.out: Observable from:completed
04-21 08:16:35.534 6468-6468/com.zsy.androidtraining I/System.out: fromObservableFuture:100001
04-21 08:16:35.534 6468-6468/com.zsy.androidtraining I/System.out: fromObservableFuture:completed
Observable.just()
可以把传统的函数转变为 Observable对象。
Observable<String> justObservable = Observable.just(returnString(),"Hello RxJava");
justObservable.subscribe(getObserver("justObservable"));
private String returnString() {
return "Hello world";
}
output
04-21 08:16:35.535 6468-6468/com.zsy.androidtraining I/System.out: justObservable:Hello world
04-21 08:16:35.535 6468-6468/com.zsy.androidtraining I/System.out: justObservable:Hello RxJava
04-21 08:16:35.535 6468-6468/com.zsy.androidtraining I/System.out: justObservable:completed
Observable.empty()/never()/throw()
不再发射数据并正常结束/创建一个不发送数据且永远不会结束的Observable/不发送数据且以错误结束的数据。
String[] strings = {"one", "two", "three"};
Observable.from(strings).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
if(s.equals("two")){
return Observable.empty();
}
return Observable.just(s);
}
}).subscribe(getObserverObject("Observer empty"));
Observable.from(strings).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
if(s.equals("two")){
return Observable.never();
}
return Observable.just(s);
}
}).subscribe(getObserverObject("Observer never"));
Observable.from(strings).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
if(s.equals("two")){
return Observable.error(new RuntimeException("error"));
}
return Observable.just(s);
}
}).subscribe(getObserverObject("Observer error"));
output
04-21 08:26:39.468 6688-6688/com.zsy.androidtraining I/System.out: Observer empty:one
04-21 08:26:39.468 6688-6688/com.zsy.androidtraining I/System.out: Observer empty:three
04-21 08:26:39.468 6688-6688/com.zsy.androidtraining I/System.out: Observer empty:completed
04-21 08:26:39.469 6688-6688/com.zsy.androidtraining I/System.out: Observer never:one
04-21 08:26:39.475 6688-6688/com.zsy.androidtraining I/System.out: Observer never:three
04-21 08:26:39.476 6688-6688/com.zsy.androidtraining I/System.out: Observer error:one
04-21 08:26:39.476 6688-6688/com.zsy.androidtraining I/System.out: Observer error:error
PublishSubject
///PublishSubject/////
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.subscribe(getObserver("publishSubject"));
publishSubject.onNext("hello");
output
04-21 08:16:35.553 6468-6468/com.zsy.androidtraining I/System.out: publishSubject:hello
BehaviorSubject
向订阅者发送截止订阅前的最新的一个数据对象(或初始值),然后发送正常的数据流
/////////BehaviorSubject/////////
BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.subscribe(getObserver("behaviorSubject"));
behaviorSubject.onNext("three");
behaviorSubject.onNext("four");
output
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: behaviorSubject:two
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: behaviorSubject:three
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: behaviorSubject:four
ReplaySubject
缓存所订阅的所有数据,向任意一个订阅它的观察者重发。
////ReplaySubject///////
ReplaySubject<String> replaySubject = ReplaySubject.create();
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
replaySubject.onCompleted();
replaySubject.subscribe(getObserver("first replaySubject"));
replaySubject.subscribe(getObserver("second replaySubject"));
output
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: first replaySubject:one
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: first replaySubject:two
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: first replaySubject:three
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: first replaySubject:completed
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: second replaySubject:one
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: second replaySubject:two
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: second replaySubject:three
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: second replaySubject:completed
AsyncSubject
发布最后一个数据给已订阅的每一个观察者。
///////AsyncSubject//////
AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.onNext("one");
asyncSubject.onNext("two");
asyncSubject.onNext("three");
asyncSubject.onCompleted();
asyncSubject.subscribe(getObserver("asyncSubject"));
output
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: asyncSubject:three
04-21 08:22:50.304 6468-6468/com.zsy.androidtraining I/System.out: asyncSubject:completed
Method
repeat()
重复发送数据。
Observable.just("one", "two").repeat(3).subscribe(getObserver("Repeat"));
output
04-21 09:41:39.629 7812-7812/com.zsy.androidtraining I/System.out: Repeat:one
04-21 09:41:39.629 7812-7812/com.zsy.androidtraining I/System.out: Repeat:two
04-21 09:41:39.630 7812-7812/com.zsy.androidtraining I/System.out: Repeat:one
04-21 09:41:39.630 7812-7812/com.zsy.androidtraining I/System.out: Repeat:two
04-21 09:41:39.630 7812-7812/com.zsy.androidtraining I/System.out: Repeat:one
04-21 09:41:39.630 7812-7812/com.zsy.androidtraining I/System.out: Repeat:two
04-21 09:41:39.631 7812-7812/com.zsy.androidtraining I/System.out: Repeat:completed
defer()
场景:声明一个Observable但是想推迟这个Observable的创建直到观察者订阅时。
private Observable<String> getString(){
System.out.println("defer ----");
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if(subscriber.isUnsubscribed()) return;
subscriber.onNext("one");
subscriber.onCompleted();
}
});
}
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return getString();
}
}).subscribe(getObserver("deffer"));
output
04-21 09:41:39.633 7812-7812/com.zsy.androidtraining I/System.out: defer ----
04-21 09:41:39.633 7812-7812/com.zsy.androidtraining I/System.out: deffer:one
04-21 09:41:39.633 7812-7812/com.zsy.androidtraining I/System.out: deffer:completed
range()
指定数字X开始,发射N个数字。
Observable.range(10, 3).subscribe(getObserverInteger("Range"));
output
04-21 09:41:39.634 7812-7812/com.zsy.androidtraining I/System.out: Range:10
04-21 09:41:39.634 7812-7812/com.zsy.androidtraining I/System.out: Range:11
04-21 09:41:39.634 7812-7812/com.zsy.androidtraining I/System.out: Range:12
04-21 09:41:39.634 7812-7812/com.zsy.androidtraining I/System.out: Range:completed
interval()/timer()
interval()定时轮询,需要注意要及时关闭。
timer()定时器,指定时长后执行
final Subscription subscription = Observable.interval(1, TimeUnit.SECONDS).subscribe(getObserverObject("Interval"));
Observable.timer(6, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
System.out.println("Timer : "+aLong);
subscription.unsubscribe();
}
});
output
04-21 09:41:40.647 7812-7831/com.zsy.androidtraining I/System.out: Interval:0
04-21 09:41:41.648 7812-7831/com.zsy.androidtraining I/System.out: Interval:1
04-21 09:41:42.647 7812-7831/com.zsy.androidtraining I/System.out: Interval:2
04-21 09:41:43.647 7812-7831/com.zsy.androidtraining I/System.out: Interval:3
04-21 09:41:44.646 7812-7831/com.zsy.androidtraining I/System.out: Interval:4
04-21 09:41:45.647 7812-7831/com.zsy.androidtraining I/System.out: Interval:5
04-21 09:41:45.649 7812-7832/com.zsy.androidtraining I/System.out: Timer : 0
Filter(过滤序列)
filter()
数据过滤,过滤掉不符合条件的数据。
Integer[] ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};
Observable.from(ints).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
}).subscribe(getObserverObject("filter"));
output
04-21 12:33:46.390 12417-12417/com.zsy.androidtraining I/System.out: filter:2
04-21 12:33:46.390 12417-12417/com.zsy.androidtraining I/System.out: filter:4
04-21 12:33:46.390 12417-12417/com.zsy.androidtraining I/System.out: filter:6
04-21 12:33:46.390 12417-12417/com.zsy.androidtraining I/System.out: filter:8
04-21 12:33:46.390 12417-12417/com.zsy.androidtraining I/System.out: filter:completed
take()/takeLast()/first()/last()/skip()/skipLast()/elementAt()
Integer[] ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};
Observable.from(ints).take(4).subscribe(getObserverObject("take"));
Observable.from(ints).takeLast(4).subscribe(getObserverObject("takeLast"));
Observable.from(ints).first().subscribe(getObserverObject("first"));
Observable.from(ints).last().subscribe(getObserverObject("last"));
Observable.from(ints).skip(3).subscribe(getObserverObject("skip"));
Observable.from(ints).skipLast(4).subscribe(getObserverObject("skipLast"));
Observable.from(ints).elementAt(4).subscribe(getObserverObject("elementAt"));
output
04-21 12:33:46.390 12417-12417/com.zsy.androidtraining I/System.out: take:1
04-21 12:33:46.391 12417-12417/com.zsy.androidtraining I/System.out: take:2
04-21 12:33:46.391 12417-12417/com.zsy.androidtraining I/System.out: take:3
04-21 12:33:46.391 12417-12417/com.zsy.androidtraining I/System.out: take:4
04-21 12:33:46.391 12417-12417/com.zsy.androidtraining I/System.out: take:completed
04-21 12:33:46.392 12417-12417/com.zsy.androidtraining I/System.out: takeLast:7
04-21 12:33:46.392 12417-12417/com.zsy.androidtraining I/System.out: takeLast:8
04-21 12:33:46.392 12417-12417/com.zsy.androidtraining I/System.out: takeLast:9
04-21 12:33:46.392 12417-12417/com.zsy.androidtraining I/System.out: takeLast:9
04-21 12:33:46.392 12417-12417/com.zsy.androidtraining I/System.out: takeLast:completed
04-21 12:33:46.401 12417-12417/com.zsy.androidtraining I/System.out: first:1
04-21 12:33:46.401 12417-12417/com.zsy.androidtraining I/System.out: first:completed
04-21 12:33:46.407 12417-12417/com.zsy.androidtraining I/System.out: last:9
04-21 12:33:46.407 12417-12417/com.zsy.androidtraining I/System.out: last:completed
04-21 12:33:46.407 12417-12417/com.zsy.androidtraining I/System.out: skip:4
04-21 12:33:46.407 12417-12417/com.zsy.androidtraining I/System.out: skip:5
04-21 12:33:46.407 12417-12417/com.zsy.androidtraining I/System.out: skip:6
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skip:7
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skip:8
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skip:9
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skip:9
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skip:completed
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skipLast:1
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skipLast:2
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skipLast:3
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skipLast:4
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skipLast:5
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skipLast:6
04-21 12:33:46.408 12417-12417/com.zsy.androidtraining I/System.out: skipLast:completed
04-21 12:33:46.409 12417-12417/com.zsy.androidtraining I/System.out: elementAt:5
04-21 12:33:46.409 12417-12417/com.zsy.androidtraining I/System.out: elementAt:completed
distinct()
去重,去掉重复的数据,作用于一个完整的序列。亲测,是根据hashCode的值来判断是否是同一条“数据”。
Observable.from(Info.generateTestList(5)).distinct().subscribe(getObserverObject("distinct"));
static class Info implements Comparable<Info>{
private String name;
private String address;
public Info(String name, String address) {
this.name = name;
this.address = address;
}
@Override
public String toString() {
return "Info{" +
"name='" + name + '\'' +
", address='" + address + '\'' +
'}';
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Info)) {
return false;
}
if (this.name == null) {
return false;
}
Info info = (Info) obj;
return this.name.equals(info.name);
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + address.hashCode();
return result;
}
public static List<Info> generateTestList(int size) {
List<Info> list = new ArrayList<>();
list.add(new Info(String.valueOf("name = " + 0), String.valueOf("address = " + 0)));
for (int i = 0; i < size; i++) {
list.add(new Info(String.valueOf("name = " + i), String.valueOf("address = " + i)));
}
return list;
}
@Override
public int compareTo(@NonNull Info o) {
return o.name.compareTo(this.name);
}
}
output
04-21 12:33:46.395 12417-12417/com.zsy.androidtraining I/System.out: distinct:Info{name='name = 0', address='address = 0'}
04-21 12:33:46.395 12417-12417/com.zsy.androidtraining I/System.out: distinct:Info{name='name = 1', address='address = 1'}
04-21 12:33:46.395 12417-12417/com.zsy.androidtraining I/System.out: distinct:Info{name='name = 2', address='address = 2'}
04-21 12:33:46.395 12417-12417/com.zsy.androidtraining I/System.out: distinct:Info{name='name = 3', address='address = 3'}
04-21 12:33:46.395 12417-12417/com.zsy.androidtraining I/System.out: distinct:Info{name='name = 4', address='address = 4'}
可以看到去掉了重复的第一条数据。依据 hashCode 来判断!!!
distinctUntilChanged()
去掉重复数据,且前后数据不一致。栗子:有一个温度传感器,每秒发送室内温度:21,21,21,23,24,24,24,24,21,23 ...,考虑到系统性能。我们不想每次值一样时更新数据。我们想去掉重复数据且数据发生改变时发出通知。这就是一个应用场景。
final Subscription subscription = Observable.interval(1,TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<?>>() {
@Override
public Observable<?> call(Long aLong) {
return Observable.just(random.nextInt(5));
}
}).distinctUntilChanged().subscribe(getObserverObject("distinctUntilChanged"));
Observable.timer(30, TimeUnit.SECONDS).doOnCompleted(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
}
}).subscribe();
output
04-21 12:51:34.783 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:3
04-21 12:51:36.784 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:0
04-21 12:51:37.783 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:4
04-21 12:51:38.782 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:0
04-21 12:51:39.783 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:1
04-21 12:51:40.782 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:0
04-21 12:51:42.783 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:4
04-21 12:51:43.783 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:0
04-21 12:51:44.783 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:1
04-21 12:51:45.785 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:2
04-21 12:51:46.783 12776-12795/com.zsy.androidtraining I/System.out: distinctUntilChanged:4
.........
可以看到上面没有出现连续重复的数据,记住是没有“连续重复”,但是可以有重复。结合上面的应用场景!
sampling()
继续考虑上面的应用场景,我们考虑上面的温度发送速率太快了,其实我们并不用获取数据那么频繁,我们只需要取某段时间间隔内的最后一条数据就足够了。所以我们现在每3s采一次样。(记住最后发送的是样期内最后一条数据,即最新一条)
final Random random = new Random();
final Subscription subscription = Observable.interval(1,TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<?>>() {
@Override
public Observable<?> call(Long aLong) {
return Observable.just(random.nextInt(5));
}
}).sample(3, TimeUnit.SECONDS).distinctUntilChanged().subscribe(getObserverObject("distinctUntilChanged"));
Observable.timer(30, TimeUnit.SECONDS).doOnCompleted(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
}
}).subscribe();
output
和上面类似,只不过发送过来的数据会少一些。
throttleFirst()
和上面一样的原理,只不过是样期内第一条数据。
timeout()
继续温度采集样例,正常情况下,我们的数据是不停的发射的。但是假设一段时间间隔内,没有数据发送过来。那么我们可以推断传感器可能坏了,所以我们要发出警告信息,即抛出异常。所以我们需要设置一个超时时长,超过时长,发出警报!Timeout便适合这个应用场景。
final Subscription subscription = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0; i<20; i++){
if(i == 3){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
}).timeout(2, TimeUnit.SECONDS).subscribe(getObserverObject("timeout"));
Observable.timer(30, TimeUnit.SECONDS).doOnCompleted(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
}
}).subscribe();
output
04-21 12:54:37.463 13019-13019/com.zsy.androidtraining I/System.out: timeout:0
04-21 12:54:37.465 13019-13019/com.zsy.androidtraining I/System.out: timeout:1
04-21 12:54:37.465 13019-13019/com.zsy.androidtraining I/System.out: timeout:2
04-21 12:54:39.466 13019-13038/com.zsy.androidtraining I/System.out: timeout:error
debounce()
解决数据发射速率过快的问题,如果在指定时间间隔内没有发送数据,那么它将发射最后一条数据。
final Subscription subscription = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0; i<30; i++){
if(i % 10 == 0){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
}).debounce(2, TimeUnit.SECONDS).subscribe(getObserverObject("debounce"));
Observable.timer(30, TimeUnit.SECONDS).doOnCompleted(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
}
}).subscribe();
output
04-21 12:33:51.431 12417-12436/com.zsy.androidtraining I/System.out: debounce:9
04-21 12:33:54.429 12417-12436/com.zsy.androidtraining I/System.out: debounce:19
04-21 12:33:57.430 12417-12436/com.zsy.androidtraining I/System.out: debounce:29
换成 sample结果一样。但是还是有区别的,sample 是每隔固定时间采样,时间间隔的起点不会改变。而debounce的时间间隔的起点是不断变化的,起点是“最近”发送的一条数据!在起点开始的 一段时间内没有数据发射,便会发射最后一条数据。


Map家族
flatMap()
在复杂的场景中,有这样一个Observable: 它发射一个数据序列,这些数据本身也可以发射 Observable。RxJava 的 flatMap 便提供一种铺平的方式,然后合并这些Observables发送的数据,最终合并的结果作为最终的 Observable。注意:在合并的过程中任何一个Observable发生错误,flatMap将会触发自己的onError()方法,并放弃整个链。FlatMap允许交叉,它不能保证有序!
Integer[] ints = {1, 2};
final Integer[] ints2 = {10, 11};
Observable.from(ints).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.merge(Observable.just(integer), Observable.from(ints2));
}
}).subscribe(getObserverObject("flatMap"));
output
04-21 14:23:58.188 16636-16636/com.zsy.androidtraining I/System.out: flatMap:1
04-21 14:23:58.189 16636-16636/com.zsy.androidtraining I/System.out: flatMap:10
04-21 14:23:58.189 16636-16636/com.zsy.androidtraining I/System.out: flatMap:11
04-21 14:23:58.190 16636-16636/com.zsy.androidtraining I/System.out: flatMap:2
04-21 14:23:58.190 16636-16636/com.zsy.androidtraining I/System.out: flatMap:10
04-21 14:23:58.190 16636-16636/com.zsy.androidtraining I/System.out: flatMap:11
04-21 14:23:58.190 16636-16636/com.zsy.androidtraining I/System.out: flatMap:completed
contactMap()
解决flatMap()的交叉问题,能够把发射的值连续在一起的铺平函数,而不是合并。即数据有序?
Observable.from(ints).concatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return Observable.merge(Observable.just(integer), Observable.from(ints2));
}
}).subscribe(getObserverObject("contactMap"));
output
04-21 14:23:58.194 16636-16636/com.zsy.androidtraining I/System.out: contactMap:1
04-21 14:23:58.194 16636-16636/com.zsy.androidtraining I/System.out: contactMap:10
04-21 14:23:58.194 16636-16636/com.zsy.androidtraining I/System.out: contactMap:11
04-21 14:23:58.194 16636-16636/com.zsy.androidtraining I/System.out: contactMap:2
04-21 14:23:58.194 16636-16636/com.zsy.androidtraining I/System.out: contactMap:10
04-21 14:23:58.194 16636-16636/com.zsy.androidtraining I/System.out: contactMap:11
04-21 14:23:58.195 16636-16636/com.zsy.androidtraining I/System.out: contactMap:completed
flatMapIterable()
这个没搞懂用途,但是大概是HashMap的映射场景?直接看例子吧。
private List<Integer> getIntegerInteger(int integer) {
List<Integer> list = new ArrayList<>();
if (integer == 1) {
list.add(11);
list.add(12);
} else {
list.add(21);
list.add(22);
}
return list;
}
Observable.from(ints).flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<?> call(Integer integer) {
return getIntegerInteger(integer);
}
}).subscribe(getObserverObject("flatMapIterable"));
output
04-21 14:23:58.197 16636-16636/com.zsy.androidtraining I/System.out: flatMapIterable:11
04-21 14:23:58.197 16636-16636/com.zsy.androidtraining I/System.out: flatMapIterable:12
04-21 14:23:58.197 16636-16636/com.zsy.androidtraining I/System.out: flatMapIterable:21
04-21 14:23:58.197 16636-16636/com.zsy.androidtraining I/System.out: flatMapIterable:22
04-21 14:23:58.197 16636-16636/com.zsy.androidtraining I/System.out: flatMapIterable:completed
switchMap()
和 flatMap()很像,不同的是每当Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。
Observable.from(ints).switchMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
Observable observable;
if (integer == 1) {
observable = Observable.timer(2, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<?>>() {
@Override
public Observable<?> call(Long aLong) {
return Observable.just("first ");
}
});
} else {
observable = Observable.just("second ");
}
return observable;
}
}).subscribe(getObserverObject("switchMap"));
output
04-21 14:23:58.217 16636-16636/com.zsy.androidtraining I/System.out: switchMap:second
04-21 14:23:58.217 16636-16636/com.zsy.androidtraining I/System.out: switchMap:completed
可以看到第一个(integer == 1)的Observable已经被取消订阅了。这里采用延时策略是为了方便观察!否则会看到打印两条结果。
scan()
可以看成是一个累积函数。scan()函数对原始Observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。
nteger[] mm = {1,3,4,5};
Observable.from(mm).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2 ;
}
}).distinct().subscribe(getObserverObject("scan"));
Observable.from(mm).scan(10, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2 ;
}
}).distinct().subscribe(getObserverObject("scan"));
output
04-21 14:23:58.229 16636-16636/com.zsy.androidtraining I/System.out: scan:1
04-21 14:23:58.229 16636-16636/com.zsy.androidtraining I/System.out: scan:4
04-21 14:23:58.229 16636-16636/com.zsy.androidtraining I/System.out: scan:8
04-21 14:23:58.229 16636-16636/com.zsy.androidtraining I/System.out: scan:13
04-21 14:23:58.229 16636-16636/com.zsy.androidtraining I/System.out: scan:completed
04-21 14:23:58.237 16636-16636/com.zsy.androidtraining I/System.out: scan:10
04-21 14:23:58.238 16636-16636/com.zsy.androidtraining I/System.out: scan:11
04-21 14:23:58.238 16636-16636/com.zsy.androidtraining I/System.out: scan:14
04-21 14:23:58.238 16636-16636/com.zsy.androidtraining I/System.out: scan:18
04-21 14:23:58.238 16636-16636/com.zsy.androidtraining I/System.out: scan:23
04-21 14:23:58.238 16636-16636/com.zsy.androidtraining I/System.out: scan:completed
可以设置一个初始值。可以看到设置了初始值的 打印了5条数据,相当于{10, 1, 3, 4, 5}
groupBy()
groupBy()
将数据分组。
Observable<GroupedObservable<String, AppInfo>> groupItems = Observable.from(AppInfo.generateTestList()).groupBy(new Func1<AppInfo, String>() {
@Override
public String call(AppInfo appInfo) {
return appInfo.name;
}
});
Observable.concat(groupItems).subscribe(getObserverObject("groupBy"));
static class AppInfo {
private String name;
private String version;
public AppInfo(String name, String version) {
this.name = name;
this.version = version;
}
public static List<AppInfo> generateTestList(){
List<AppInfo> list = new ArrayList<>();
list.add(new AppInfo("qq","10.0"));
list.add(new AppInfo("alipay","0.0"));
list.add(new AppInfo("qq","10.2"));
list.add(new AppInfo("youku","1.2"));
list.add(new AppInfo("youku","1.0"));
list.add(new AppInfo("wechat","1.0"));
list.add(new AppInfo("qq","11.0"));
return list;
}
@Override
public String toString() {
return "AppInfo{" +
"name='" + name + '\'' +
", version='" + version + '\'' +
'}';
}
}
output
04-21 15:40:47.532 18619-18619/com.zsy.androidtraining I/System.out: groupBy:AppInfo{name='qq', version='10.0'}
04-21 15:40:47.532 18619-18619/com.zsy.androidtraining I/System.out: groupBy:AppInfo{name='qq', version='10.2'}
04-21 15:40:47.532 18619-18619/com.zsy.androidtraining I/System.out: groupBy:AppInfo{name='qq', version='11.0'}
04-21 15:40:47.534 18619-18619/com.zsy.androidtraining I/System.out: groupBy:AppInfo{name='alipay', version='0.0'}
04-21 15:40:47.534 18619-18619/com.zsy.androidtraining I/System.out: groupBy:AppInfo{name='youku', version='1.2'}
04-21 15:40:47.534 18619-18619/com.zsy.androidtraining I/System.out: groupBy:AppInfo{name='youku', version='1.0'}
04-21 15:40:47.534 18619-18619/com.zsy.androidtraining I/System.out: groupBy:AppInfo{name='wechat', version='1.0'}
04-21 15:40:47.535 18619-18619/com.zsy.androidtraining I/System.out: groupBy:completed
可以看到数据被分组了。
buffer()
buffer将原函数变换成一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。buffer(count, skip)允许你每隔skip项,填充缓冲区。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 11; i++) {
try {
Thread.sleep(1000);
subscriber.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).buffer(3).subscribe(getObserverObject("buffer"));
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
subscriber.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).buffer(2, 3).subscribe(getObserverObject("buffer and skip"));
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
subscriber.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).buffer(1500, TimeUnit.MILLISECONDS).subscribe(getObserverObject("buffer and timespan"));
output
04-21 16:12:19.244 19879-19879/com.zsy.androidtraining I/System.out: buffer:[0, 1, 2]
04-21 16:12:22.247 19879-19879/com.zsy.androidtraining I/System.out: buffer:[3, 4, 5]
04-21 16:12:25.251 19879-19879/com.zsy.androidtraining I/System.out: buffer:[6, 7, 8]
04-21 16:12:27.252 19879-19879/com.zsy.androidtraining I/System.out: buffer:[9, 10]
04-21 16:12:27.252 19879-19879/com.zsy.androidtraining I/System.out: buffer:completed
04-21 16:12:29.253 19879-19879/com.zsy.androidtraining I/System.out: buffer and skip:[0, 1]
04-21 16:12:32.255 19879-19879/com.zsy.androidtraining I/System.out: buffer and skip:[3, 4]
04-21 16:12:35.256 19879-19879/com.zsy.androidtraining I/System.out: buffer and skip:[6, 7]
04-21 16:12:37.257 19879-19879/com.zsy.androidtraining I/System.out: buffer and skip:[9]
04-21 16:12:37.257 19879-19879/com.zsy.androidtraining I/System.out: buffer and skip:completed
04-21 16:12:38.761 19879-19901/com.zsy.androidtraining I/System.out: buffer and timespan:[0]
04-21 16:12:40.261 19879-19901/com.zsy.androidtraining I/System.out: buffer and timespan:[1]
04-21 16:12:41.763 19879-19901/com.zsy.androidtraining I/System.out: buffer and timespan:[2, 3]
04-21 16:12:43.260 19879-19901/com.zsy.androidtraining I/System.out: buffer and timespan:[4]
04-21 16:12:44.761 19879-19901/com.zsy.androidtraining I/System.out: buffer and timespan:[5, 6]
04-21 16:12:46.263 19879-19901/com.zsy.androidtraining I/System.out: buffer and timespan:[7]
04-21 16:12:47.266 19879-19879/com.zsy.androidtraining I/System.out: buffer and timespan:[8, 9]
04-21 16:12:47.266 19879-19879/com.zsy.androidtraining I/System.out: buffer and timespan:completed
注意上面的 buffer(3, 2) skip < count 所以数据会重复输出,如果 skip > count 则会有数据被漏掉。buffer(1500,TimeUnit.MILLISECONDS)每隔一段时间发射一个列表。
window()
window() 和 buffer() 很像,但是它发射的是 Observable 而不是列表。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0; i<5; i++){
try {
Thread.sleep(1000);
subscriber.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
}).window(2).subscribe(new Observer<Observable<Integer>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Observable<Integer> integerObservable) {
integerObservable.subscribe(getObserverInteger("testWindow"));
}
});
output
04-21 16:12:48.270 19879-19879/com.zsy.androidtraining I/System.out: testWindow:0
04-21 16:12:49.272 19879-19879/com.zsy.androidtraining I/System.out: testWindow:1
04-21 16:12:49.272 19879-19879/com.zsy.androidtraining I/System.out: testWindow:completed
04-21 16:12:50.273 19879-19879/com.zsy.androidtraining I/System.out: testWindow:2
04-21 16:12:51.273 19879-19879/com.zsy.androidtraining I/System.out: testWindow:3
04-21 16:12:51.273 19879-19879/com.zsy.androidtraining I/System.out: testWindow:completed
04-21 16:12:52.274 19879-19879/com.zsy.androidtraining I/System.out: testWindow:4
04-21 16:12:52.274 19879-19879/com.zsy.androidtraining I/System.out: testWindow:completed
cast()
类型转换
组合 Observables
merge()
在异步世界的场景中会有这样的场景,我们有多个来源但是又只想有一个结果:多输入,单输出。RxJava 的 merge() 方法可以帮你把多个甚至更多的 Observable 合并到他们发射的数据项里。
Observable.merge(Observable.just(1,3), Observable.just(2,4), Observable.just(5,6))
.subscribe(getObserverObject("merge"));
output
04-21 16:38:03.582 20143-20143/com.zsy.androidtraining I/System.out: merge:1
04-21 16:38:03.582 20143-20143/com.zsy.androidtraining I/System.out: merge:3
04-21 16:38:03.582 20143-20143/com.zsy.androidtraining I/System.out: merge:2
04-21 16:38:03.582 20143-20143/com.zsy.androidtraining I/System.out: merge:4
04-21 16:38:03.582 20143-20143/com.zsy.androidtraining I/System.out: merge:5
04-21 16:38:03.582 20143-20143/com.zsy.androidtraining I/System.out: merge:6
04-21 16:38:03.582 20143-20143/com.zsy.androidtraining I/System.out: merge:completed
当合并过程中每个 Observable 抛出错误将会打断合并。如果需要避免这种情况,使用 Observable.mergeDelayError() 方法。所有的 Observables 都完成时,mergeDelayError()将会发射 onError()。
zip()
zip() 可以合并两个或者多个 Observables 发射出的数据项,根据指定的函数Func 变换它们,并发射一个新值。
Observable.zip(Observable.just(1, 2), Observable.just("a", "b", "c"), new Func2<Integer, String, Object>() {
@Override
public Object call(Integer integer, String s) {
return integer + s;
}
}).subscribe(getObserverObject("zip"));
output
04-23 08:47:16.107 2658-2658/com.zsy.androidtraining I/System.out: zip:1a
04-23 08:47:16.107 2658-2658/com.zsy.androidtraining I/System.out: zip:2b
04-23 08:47:16.107 2658-2658/com.zsy.androidtraining I/System.out: zip:completed
join()
这个方法花了好长时间才理解,join()即是把两个Observable的数据组合在一起。与zip()方法有点类似,但是区别在 join() 需要考虑时间。而 zip() 只是两两配对,与时间无关。join() 中发射的每一个数据都有 时间窗口(可以理解为有效期),时间窗口之外数据无效。这个解释起来有点难解释,看具体例子比较好理解。groupJoin()类似。
//原数据源每 2 s 钟发射一个数据
final Observable<String> left = Observable.interval(2, TimeUnit.SECONDS).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
println("left call "+aLong);
return String.valueOf("ss = " + aLong + "--");
}
});
Observable<Integer> right = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0; i<5; i++){
println("right call " + i);
subscriber.onNext(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();//如果注释掉这行代码, 上面的 left call 会一直无限打印。这句代码被调用,整个合并结束
}
});
Func1<String, Observable<Long>> leftDurationSelector = new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(1, TimeUnit.SECONDS);//理解为 left 发射的每一个数据有效期为 1 s
}
};
Func1<Integer, Observable<Long>> rightDurationSelector = new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(4, TimeUnit.SECONDS);//理解为 right 发射的每一个数据有效期为 4 s
}
};
Func2<String, Integer, String> resultSelector = new Func2<String, Integer, String>() {
@Override
public String call(String s, Integer integer) {//有效"重合"的数据组合
return s + integer;
}
};
left.join(right, leftDurationSelector, rightDurationSelector, resultSelector).subscribe(getObserverObject("join"));
/**
*
*
04-23 08:23:36.694 2363-2363/com.zsy.androidtraining I/System.out: right call 0
04-23 08:23:37.696 2363-2363/com.zsy.androidtraining I/System.out: right call 1
04-23 08:23:38.694 2363-2382/com.zsy.androidtraining I/System.out: left call 0
04-23 08:23:38.694 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 0--1
04-23 08:23:38.694 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 0--0
04-23 08:23:38.697 2363-2363/com.zsy.androidtraining I/System.out: right call 2
04-23 08:23:38.697 2363-2363/com.zsy.androidtraining I/System.out: join:ss = 0--2
04-23 08:23:39.698 2363-2363/com.zsy.androidtraining I/System.out: right call 3
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: left call 1
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--0
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--1
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--3
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--2
04-23 08:23:40.699 2363-2363/com.zsy.androidtraining I/System.out: right call 4
04-23 08:23:40.699 2363-2363/com.zsy.androidtraining I/System.out: join:ss = 1--4
...
现在开始分析一下上面的执行过程,假设开始起点时间为 0s ,想象一条起点为 0s 的时间线 ----> 指有效期
right call 0 ----> 0s - 5s
right call 1 ----> 1s - 6s
left call 0 ----> 2s - 3s 此时 和 上面两条数据 有 "重合"时间窗口,打印 join:ss = 0--1 join:ss = 0--0
right call 2 ----> 3s - 7s 和 left call 0 有时间"重合"窗口,打印 join:ss = 0--2
right call 3 ----> 4s - 8s 和 left call 0 没有时间"重合"窗口,所以不会打印
left call 1 ----> 4s - 5s 找上面的"重合",看到所有的 right call 0, 1, 2, 3 都重合 ,所以打印四条数据
下面以此类推,,,
主要是需要有时间"重合"!!!
这里不能保证顺序!!
*
*/
output
04-23 08:23:36.694 2363-2363/com.zsy.androidtraining I/System.out: right call 0
04-23 08:23:37.696 2363-2363/com.zsy.androidtraining I/System.out: right call 1
04-23 08:23:38.694 2363-2382/com.zsy.androidtraining I/System.out: left call 0
04-23 08:23:38.694 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 0--1
04-23 08:23:38.694 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 0--0
04-23 08:23:38.697 2363-2363/com.zsy.androidtraining I/System.out: right call 2
04-23 08:23:38.697 2363-2363/com.zsy.androidtraining I/System.out: join:ss = 0--2
04-23 08:23:39.698 2363-2363/com.zsy.androidtraining I/System.out: right call 3
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: left call 1
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--0
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--1
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--3
04-23 08:23:40.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 1--2
04-23 08:23:40.699 2363-2363/com.zsy.androidtraining I/System.out: right call 4
04-23 08:23:40.699 2363-2363/com.zsy.androidtraining I/System.out: join:ss = 1--4
04-23 08:23:42.692 2363-2382/com.zsy.androidtraining I/System.out: left call 2
04-23 08:23:42.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 2--4
04-23 08:23:42.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 2--3
04-23 08:23:42.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 2--2
04-23 08:23:44.693 2363-2382/com.zsy.androidtraining I/System.out: left call 3
04-23 08:23:44.693 2363-2382/com.zsy.androidtraining I/System.out: join:ss = 3--4
04-23 08:23:44.700 2363-2385/com.zsy.androidtraining I/System.out: join:completed

combineLatest()
结合最近的Observable 发射的数据,可以是多条。可以这样理解,每个数据流的只有最新(最近)push的数据有效。假设有两个数据源,当其中一个数据源发送数据时,其就会去另一个数据源寻找最新的数据组合。记住发送数据时才会触发组合操作!
final Observable<String> observable1 = Observable.interval(2, TimeUnit.SECONDS).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
println("observable1 = "+aLong);
return "observable1 = "+aLong;
}
});
Observable<String> observable2 = Observable.interval(3, TimeUnit.SECONDS).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
println("observable2 = "+aLong);
return "observable2 = "+aLong;
}
});
final Subscription subscription = Observable.combineLatest(observable1, observable2, new Func2<String, String, Object>() {
@Override
public Object call(String s, String s2) {
return s + " ----- " + s2;
}
}).subscribe(getObserverObject("testCombineLatest"));
Observable.timer(10, TimeUnit.SECONDS).doOnCompleted(new Action0() {
@Override
public void call() {
subscription.unsubscribe();
}
}).subscribe();
output
04-23 09:13:44.815 3327-3348/com.zsy.androidtraining I/System.out: observable1 = 0
04-23 09:13:45.816 3327-3349/com.zsy.androidtraining I/System.out: observable2 = 0
04-23 09:13:45.816 3327-3349/com.zsy.androidtraining I/System.out: testCombineLatest:observable1 = 0 ----- observable2 = 0
04-23 09:13:46.816 3327-3348/com.zsy.androidtraining I/System.out: observable1 = 1
04-23 09:13:46.816 3327-3348/com.zsy.androidtraining I/System.out: testCombineLatest:observable1 = 1 ----- observable2 = 0
04-23 09:13:48.815 3327-3348/com.zsy.androidtraining I/System.out: observable1 = 2
04-23 09:13:48.815 3327-3348/com.zsy.androidtraining I/System.out: testCombineLatest:observable1 = 2 ----- observable2 = 0
04-23 09:13:48.816 3327-3349/com.zsy.androidtraining I/System.out: observable2 = 1
04-23 09:13:48.816 3327-3349/com.zsy.androidtraining I/System.out: testCombineLatest:observable1 = 2 ----- observable2 = 1
04-23 09:13:50.816 3327-3348/com.zsy.androidtraining I/System.out: observable1 = 3
04-23 09:13:50.816 3327-3348/com.zsy.androidtraining I/System.out: testCombineLatest:observable1 = 3 ----- observable2 = 1
04-23 09:13:51.819 3327-3349/com.zsy.androidtraining I/System.out: observable2 = 2
04-23 09:13:51.820 3327-3349/com.zsy.androidtraining I/System.out: testCombineLatest:observable1 = 3 ----- observable2 = 2
04-23 09:13:52.815 3327-3348/com.zsy.androidtraining I/System.out: observable1 = 4
04-23 09:13:52.815 3327-3348/com.zsy.androidtraining I/System.out: testCombineLatest:observable1 = 4 ----- observable2 = 2
And/Then/When
暂留位

Switch
暂留位

startWith()
在Observable 开始发射他们的数据之前,startWith() 通过传递一个参数来先发射一个数据序列。
Observable<String> stringObservable = Observable.just("a", "b", "c");
Observable<String> longObservable = Observable.just("1", "2", "3", "4");
Observable.concat(stringObservable, longObservable).subscribe(getObserverObject("contact"));
stringObservable.startWith(longObservable).subscribe(getObserverObject("startWith"));
output
04-23 09:42:07.578 3770-3770/com.zsy.androidtraining I/System.out: contact:a
04-23 09:42:07.578 3770-3770/com.zsy.androidtraining I/System.out: contact:b
04-23 09:42:07.578 3770-3770/com.zsy.androidtraining I/System.out: contact:c
04-23 09:42:07.578 3770-3770/com.zsy.androidtraining I/System.out: contact:1
04-23 09:42:07.578 3770-3770/com.zsy.androidtraining I/System.out: contact:2
04-23 09:42:07.578 3770-3770/com.zsy.androidtraining I/System.out: contact:3
04-23 09:42:07.578 3770-3770/com.zsy.androidtraining I/System.out: contact:4
04-23 09:42:07.580 3770-3770/com.zsy.androidtraining I/System.out: contact:completed
04-23 09:42:07.583 3770-3770/com.zsy.androidtraining I/System.out: startWith:1
04-23 09:42:07.583 3770-3770/com.zsy.androidtraining I/System.out: startWith:2
04-23 09:42:07.584 3770-3770/com.zsy.androidtraining I/System.out: startWith:3
04-23 09:42:07.584 3770-3770/com.zsy.androidtraining I/System.out: startWith:4
04-23 09:42:07.584 3770-3770/com.zsy.androidtraining I/System.out: startWith:a
04-23 09:42:07.585 3770-3770/com.zsy.androidtraining I/System.out: startWith:b
04-23 09:42:07.585 3770-3770/com.zsy.androidtraining I/System.out: startWith:c
04-23 09:42:07.585 3770-3770/com.zsy.androidtraining I/System.out: startWith:completed
这里顺便测试了一下 concat ,其起到合并作用,且有序的合并。
Schedulers
Schedulers 以一种最简单的方式将多线程用在你的 App 中。他们都是 RxJava重要的一部分并能很好地与Observable协同工作。他们无需处理实现,同步,线程,平台限制,平台变化而可以提供一种灵活的方式来创建并发程序。
Schedulers.io()
这个调度器用于 I/O操作。它基于根据需要,增长或缩减来自适应的线程池。需要注意的是线程池是无限制的,大量的 I/O 调度操作将创建许多个线程并占用内存。
Schedulers.computation()
这个是计算工作默认的调度器,与 I/O操作无关。 buffer(), debounce(), interval(), sample(), skip() 等的调度器。
Schedulers.immediate()
这个调度器允许你立即在当前线程执行你指定的工作。它是 timeout(), timeInterval(), timestamp() 的默认调度器。
Schedulers.newThread()
启动一个新线程
Schedulers.trampoline
当我们想在当前线程执行一个任务时,并不是立即,我们可以用 这个方法将它入队。这个调度器会处理它的队列并且按序运行队列中的每一个任务。它是 repeat() 和 retry() 方法默认的调度器。
A Simple Sample
Schedulers.io().createWorker().schedule(new Action0() {
@Override
public void call() {
//耗时操作
}
});
SubscribeOn / ObserveOn
用来和 Observable 一起工作,subscribeOn() 指定 执行代码的线程, observableOn() 指定结果返回时 执行的线程。
Observable.just("a", "b", "c")
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserverObject("Async"));
onBackpressureBuffer()
告诉Observable 发射的数据如果比观察者消费的速度要快的话,它必须把他们存储在缓存中并提供一个合适的时间给它们。
小结
RxJava 还有很多用法,这里只是练习了一些常用的。但这仅仅是练习,要想用好RxJava 需要不断的练习,不断的思考应用场景,这样才能用的得心应手,才能提高开发质量。RxJava 1.0 已经停止维护,RxJava 2.0 已经到来,学习的道路还很长,加油!
网友评论