这可能是最好的 RxJava 2.x 入门教程系列专栏
文章链接:
这可能是最好的 RxJava 2.x 入门教程(完结版)【重磅推出】
这可能是最好的 RxJava 2.x 入门教程(一)
这可能是最好的 RxJava 2.x 入门教程(二)
这可能是最好的 RxJava 2.x 入门教程(三)
这可能是最好的 RxJava 2.x 入门教程(四)
这可能是最好的 RxJava 2.x 入门教程(五)
GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples
为了满足大家的饥渴难耐,GitHub 将同步更新代码,主要包含基本的代码封装,RxJava 2.x 所有操作符应用场景介绍和实际应用场景,后期除了 RxJava 可能还会增添其他东西,总之,GitHub 上的 Demo 专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples
前言
最近很多小伙伴私信我,说自己很懊恼,对于 RxJava 2.x 系列一看就能明白,但自己写却又写不出来。如果 LZ 能放上实战情景教程就最好不过了。也是哈,单讲我们的操作符,也让我们的教程不温不火,但 LZ 自己选择的路,那跪着也要走完呀。所以,也就让我可怜的小伙伴们忍忍了,操作符马上就讲完了。
正题
Single
顾名思义,Single
只会接收一个参数,而 SingleObserver
只会调用 onError()
或者 onSuccess()
。
Single.just(new Random().nextInt())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull Integer integer) {
mRxOperatorsText.append("single : onSuccess : "+integer+"\n");
Log.e(TAG, "single : onSuccess : "+integer+"\n" );
}
@Override
public void onError(@NonNull Throwable e) {
mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n");
Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
}
});
输出:
distinct
去重操作符,简单的作用就是去重。
Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("distinct : " + integer + "\n");
Log.e(TAG, "distinct : " + integer + "\n");
}
});
输出:
很明显,发射器发送的事件,在接收的时候被去重了。
debounce
去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
}).debounce(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("debounce :" + integer + "\n");
Log.e(TAG,"debounce :" + integer + "\n");
}
});
输出:
代码很清晰,去除发送间隔时间小于 500 毫秒的发射事件,所以 1 和 3 被去掉了。
defer
简单地时候就是每次订阅都会创建一个新的 Observable
,并且如果没有被订阅,就不会产生新的 Observable
。
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(1, 2, 3);
}
});
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
mRxOperatorsText.append("defer : " + integer + "\n");
Log.e(TAG, "defer : " + integer + "\n");
}
@Override
public void onError(@NonNull Throwable e) {
mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");
Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
}
@Override
public void onComplete() {
mRxOperatorsText.append("defer : onComplete\n");
Log.e(TAG, "defer : onComplete\n");
}
});
输出:
last
last
操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
Observable.just(1, 2, 3)
.last(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("last : " + integer + "\n");
Log.e(TAG, "last : " + integer + "\n");
}
});
输出:
merge
merge
顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge
的作用是把多个 Observable
结合起来,接受可变参数,也支持迭代器集合。注意它和 concat
的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。
Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("merge :" + integer + "\n");
Log.e(TAG, "accept: merge :" + integer + "\n" );
}
});
输出:
reduce
reduce
操作符每次用一个方法处理一个值,可以有一个 seed
作为初始值。
Observable.just(1, 2, 3)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("reduce : " + integer + "\n");
Log.e(TAG, "accept: reduce : " + integer + "\n");
}
});
输出:
可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。
scan
scan
操作符作用和上面的 reduce
一致,唯一区别是 reduce
是个只追求结果的坏人,而 scan
会始终如一地把每一个步骤都输出。
Observable.just(1, 2, 3)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mRxOperatorsText.append("scan " + integer + "\n");
Log.e(TAG, "accept: scan " + integer + "\n");
}
});
输出:
看日志,没毛病。
window
按照实际划分窗口,将数据发送给不同的 Observable
mRxOperatorsText.append("window\n");
Log.e(TAG, "window\n");
Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
.take(15) // 最多接收15个
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(@NonNull Observable<Long> longObservable) throws Exception {
mRxOperatorsText.append("Sub Divide begin...\n");
Log.e(TAG, "Sub Divide begin...\n");
longObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
mRxOperatorsText.append("Next:" + aLong + "\n");
Log.e(TAG, "Next:" + aLong + "\n");
}
});
}
});
输出:
写在最后
至此,大部分 RxJava 2.x 的操作符就告一段落了,当然还有一些没有提到的操作符,不是说它们不重要,而是 LZ 也要考虑大家的情况,接下来就会根据实际应用场景来对 RxJava 2.x 发起冲锋。如果想看更多的数据,请移步 GitHub:https://github.com/nanchen2251/RxJava2Examples
做不完的开源,写不完的矫情。欢迎扫描下方二维码或者公众号搜索「nanchen」关注我的微信公众号,目前多运营 Android ,尽自己所能为你提升。如果你喜欢,为我点赞分享吧~
nanchen
网友评论
emitter.onNext(1); // skip 先收到一个1
Thread.sleep(400);
emitter.onNext(2); // deliver 过了400ms收到一个2,小于设定时间500ms,把前一个丢掉,现在只有一个2
Thread.sleep(505);
emitter.onNext(3); // skip 过了505ms收到一个3,符合设定时间,保存,现在是2、3
Thread.sleep(100);
emitter.onNext(4); // deliver 过了100ms收到一个4,小于设定时间,把前一个丢掉,丢掉3,保存4,现在是2、4
Thread.sleep(605);
emitter.onNext(5); // deliver 过了605ms收到一个5,符合设定时间,保存,现在是2、4、5
Thread.sleep(510);
emitter.onComplete();
通过Observable只发出最后一个项目(或符合某些条件的最后一个项目)
重点在发出(emit)一词,而不是取出。
Observable.just(a); 此时 just 中的 a 已经初始化了,此后在修改 int a = 2;在订阅时依然会发送 a=1。
使用 defer 就可以将初始化延迟到订阅时,大概就是这个意思吧。
emitter.onNext(1); // skip 先收到一个1
Thread.sleep(400);
emitter.onNext(2); // deliver 过了400ms收到一个2,小于设定时间500ms,把前一个丢掉,现在只有一个2
Thread.sleep(505);
emitter.onNext(3); // skip 过了505ms收到一个3,符合设定时间,保存,现在是2、3
Thread.sleep(100);
emitter.onNext(4); // deliver 过了100ms收到一个4,小于设定时间,把前一个丢掉,丢掉3,保存4,现在是2、4
Thread.sleep(605);
emitter.onNext(5); // deliver 过了605ms收到一个5,符合设定时间,保存,现在是2、4、5
Thread.sleep(510);
emitter.onComplete();
比如我按照以下顺序发送数据:1--->2--->3--->4--->5--->6,间隔时间依次为200,350,250,280,290ms,而我的debounce设定的时间为285,那么mirro的observable首先收到了一个1,200ms后收到了2,因为200小于285,所以前面的1会被丢弃掉,现在只有一个2,然后再过350ms后来了3,因为350大于设定时间,所以3会被存储,之后的4是250ms之后来的,所以它会替换掉4...
就是这么一个流程。