美文网首页
RxJava初探-Consumer理解和使用

RxJava初探-Consumer理解和使用

作者: kevinsEegets | 来源:发表于2019-12-10 10:17 被阅读0次

    在上一章我们了解了如何如何用RxJava实现一个简单消息发送,我们先回顾一下上节的代码

    
     Observable.create(ObservableOnSubscribe<Int> {
                it?.onNext(1)
                it?.onNext(2)
                it?.onNext(3)
                it?.onComplete()
            }).subscribe(object: Observer<Int> {
                override fun onComplete() {
                    debugMsg("onComplete")
                }
    
                override fun onSubscribe(d: Disposable?) {
                    debugMsg("onSubscribe")
                }
    
                override fun onNext(value: Int?) {
                    debugMsg("==",value)
                }
    
                override fun onError(e: Throwable?) {
                    debugMsg("onError")
                }
            })
    
    

    如上代码我们看到,每次我们想要通过RxJava实现一个事件时,都需要实现 onSubscribe(), onComplete() onNext(),onError() 我们实际开发中有时候是用不到这么多回调方法的, 有没有更好的方法呢?

    这时候我们就可以用到Consumer和Action了

    我们先看看Consumer源码的解释,很简单

    /**
     * A functional interface (callback) that accepts a single value.
     * @param <T> the value type
     */
    public interface Consumer<T> {
        /**
         * Consume the given value.
         * @param t the value
         * @throws Exception on error
         */
        void accept(T t) throws Exception;
    }
    
    

    我们看看Consumer源码的解释

    A functional interface (callback) that accepts a single value. 只接受单个值返回接口

    可以看到只有一个方法accept(T t), Consumer参数的方法表示下游只关心onNext事件, 其他的事件我们不考虑

    Consumer的accept()带有一个T参数,负责接收上游发送过来的事件

    我们用Consumer修改一下我们上一章最后的代码

    Observable.create(ObservableOnSubscribe<Int> {
                it?.onNext(1)
                it?.onNext(2)
                it?.onNext(3)
                it?.onComplete()
            }).subscribe(object: Consumer<Int> {
                override fun accept(t: Int?) {
                    debugMsg("msg--->$t")
                }
            })
    

    日志输出

    D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->1
    D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->2
    D/com.eegegts: .MainActivity$loadRxjava3_1$2--->msg--->3
    

    可以看到我们的代码少了很多, 日志也输出了我们发送的事件

    那问题来了,假如我现在就想用Consumer来关心完成和别的事件呢,其实也是可以的,我们看下图

    2019-12-10_09-50.png

    可以看到subscribe参数支持了很多, 包括我们熟悉的Consumer<T>, Consumer<Throwable>以及Action

    我们再看看Action的源码解释

    /**
     * A functional interface similar to Runnable but allows throwing a checked exception.
     */
    public interface Action {
        /**
         * Runs the action and optionally throws a checked exception.
         * @throws Exception if the implementation wishes to throw a checked exception
         */
        void run() throws Exception;
    }
    

    可以看到有一个可以输出异常的run()方法,定义的是类似Runnable的接口

    2019-12-10_10-00.png

    如上图,我们可以看出此处的Action代表的是onComplete, 我们就明白了,在run()方法中可以执行完成后的操作.我们看看subscribe全部参数的使用例子

    Observable.create (ObservableOnSubscribe<Int>{
                it.onNext(1)
                "".debugMsg("MainActivity", "emit 2")
                it.onNext(2)
                "".debugMsg("MainActivity","emit 3")
                it.onNext(3)
                "".debugMsg("MainActivity","emit complete")
                it.onComplete()
            }).subscribe(object :Consumer<Int> {
                override fun accept(it: Int?) {
                    debugMsg("onNext", it)
                }
            }, object :Consumer<Throwable> {
                override fun accept(it: Throwable?) {
                    debugMsg("throwable",it)
                }
    
            }, object :Action {
                override fun run() {
                    debugMsg("complete")
                }
            }, object :Consumer<Disposable> {
                override fun accept(it: Disposable?) {
                    debugMsg("disposable",it)
                }
            })
    

    日志输出

    D/com.eegegts: .MainActivity$loadRxjava3_2$5--->disposable null
    D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 1
    D/com.eegegts: String--->MainActivity emit 2
    D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 2
    D/com.eegegts: String--->MainActivity emit 3
    D/com.eegegts: .MainActivity$loadRxjava3_2$2--->onNext 3
    D/com.eegegts: String--->MainActivity emit complete
    D/com.eegegts: .MainActivity$loadRxjava3_2$4--->complete
    

    通过日志可以看出,我们的complete, disposable以及throwable都输出了,我们总结得出 Consumer其实可以做Observable的所有事情.

    下一章我们将看看如何用RxJava有哪些线程以及轻松的实现线程切换~

    相关文章

      网友评论

          本文标题:RxJava初探-Consumer理解和使用

          本文链接:https://www.haomeiwen.com/subject/qwvwgctx.html