在上一章我们了解了如何如何用RxJava实现一个简单消息发送,我们先回顾一下上节的代码
Observable.create(ObservableOnSubscribe<Int> {
it?.onNext(1)
it?.onNext(2)
it?.onNext(3)
it?.onComplete()
}).subscribe(object: Observer<Int> {
override fun onComplete() {
debugMsg("onComplete")
}
override fun onSubscribe(d: Disposable?) {
debugMsg("onSubscribe")
}
override fun onNext(value: Int?) {
debugMsg("==",value)
}
override fun onError(e: Throwable?) {
debugMsg("onError")
}
})
如上代码我们看到,每次我们想要通过RxJava实现一个事件时,都需要实现 onSubscribe(), onComplete() onNext(),onError()
我们实际开发中有时候是用不到这么多回调方法的, 有没有更好的方法呢?
这时候我们就可以用到Consumer和Action了
我们先看看Consumer源码的解释,很简单
/**
* A functional interface (callback) that accepts a single value.
* @param <T> the value type
*/
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
我们看看Consumer源码的解释
A functional interface (callback) that accepts a single value. 只接受单个值返回接口
可以看到只有一个方法accept(T t), Consumer参数的方法表示下游只关心onNext事件, 其他的事件我们不考虑
Consumer的accept()带有一个T参数,负责接收上游发送过来的事件
我们用Consumer修改一下我们上一章最后的代码
Observable.create(ObservableOnSubscribe<Int> {
it?.onNext(1)
it?.onNext(2)
it?.onNext(3)
it?.onComplete()
}).subscribe(object: Consumer<Int> {
override fun accept(t: Int?) {
debugMsg("msg--->$t")
}
})
日志输出
D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->1
D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->2
D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->3
可以看到我们的代码少了很多, 日志也输出了我们发送的事件
那问题来了,假如我现在就想用Consumer来关心完成和别的事件呢,其实也是可以的,我们看下图
2019-12-10_09-50.png可以看到subscribe参数支持了很多, 包括我们熟悉的Consumer<T>, Consumer<Throwable>以及Action
我们再看看Action的源码解释
/**
* A functional interface similar to Runnable but allows throwing a checked exception.
*/
public interface Action {
/**
* Runs the action and optionally throws a checked exception.
* @throws Exception if the implementation wishes to throw a checked exception
*/
void run() throws Exception;
}
2019-12-10_10-00.png可以看到有一个可以输出异常的run()方法,定义的是类似Runnable的接口
如上图,我们可以看出此处的Action代表的是onComplete, 我们就明白了,在run()方法中可以执行完成后的操作.我们看看subscribe全部参数的使用例子
Observable.create (ObservableOnSubscribe<Int>{
it.onNext(1)
"".debugMsg("MainActivity", "emit 2")
it.onNext(2)
"".debugMsg("MainActivity","emit 3")
it.onNext(3)
"".debugMsg("MainActivity","emit complete")
it.onComplete()
}).subscribe(object :Consumer<Int> {
override fun accept(it: Int?) {
debugMsg("onNext", it)
}
}, object :Consumer<Throwable> {
override fun accept(it: Throwable?) {
debugMsg("throwable",it)
}
}, object :Action {
override fun run() {
debugMsg("complete")
}
}, object :Consumer<Disposable> {
override fun accept(it: Disposable?) {
debugMsg("disposable",it)
}
})
日志输出
D/com.eegegts: .MainActivity$loadRxjava3_2$5--->disposable null
D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 1
D/com.eegegts: String--->MainActivity emit 2
D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 2
D/com.eegegts: String--->MainActivity emit 3
D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 3
D/com.eegegts: String--->MainActivity emit complete
D/com.eegegts: .MainActivity$loadRxjava3_2$4--->complete
通过日志可以看出,我们的complete, disposable以及throwable都输出了,我们总结得出 Consumer其实可以做Observable的所有事情.
网友评论