美文网首页
Rxjava的基础操作符

Rxjava的基础操作符

作者: 小迷糊_dcee | 来源:发表于2020-10-14 16:58 被阅读0次

Rxjava原理介绍
Rxjava原理 基于 一种扩展的观察者模式
Rxjava的扩展观察者模式中有4个角色:
被观察者(Observable):产生事件
观察者(Observer):接收事件,并给出响应动作
订阅(Subscribe):连接 被观察者 & 观察者
事件(Event):被观察者 & 观察者 沟通的载体

944365-101f852f6f0cd618.png
class RxjavaActivity : AppCompatActivity() {
    private val TAG = "YSL"
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        setContentView(R.layout.activity_rxjava)


        initCreate()
        initJust()
        initFormArray()
        initFromIterable()
        //empty error never一般测试使用
        initEmpty()
        initError()
        initNever()

        //延迟创建
        initTimer()
        initInterval()
        initIntervalRange()
        initRange()
    }

    /**
     * rexjava正常的发送及接受
     */
    @SuppressLint("CheckResult")
    private fun initCreate() {
        var observable = Observable.create(ObservableOnSubscribe<Int> {
            it.onNext(1)
            it.onNext(2)
            it.onNext(3)
            it.onComplete()
        }).subscribe(object : Observer<Int> {
            override fun onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

            override fun onSubscribe(d: Disposable) {
                Log.d(TAG, "开始采用subscribe连接");
            }

            override fun onNext(t: Int) {
                Log.d(TAG, "接收到了事件${t}");
            }

            override fun onError(e: Throwable) {
                Log.d(TAG, "对Error事件作出响应");
            }

        })
    }

    /**
     * 快速创建被观察者observable,
     * 发送:直接发送传入的事件(做多10个参数)
     */
    private fun initJust() {
        Observable.just(1, 2, 3, 4, 5)
            .subscribe(object : Observer<Int> {
                override fun onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

                override fun onSubscribe(d: Disposable) {
                    Log.d(TAG, "开始采用subscribe连接");
                }

                override fun onNext(t: Int) {
                    Log.d(TAG, "接收到了事件${t}");
                }

                override fun onError(e: Throwable) {
                    Log.d(TAG, "对Error事件作出响应");
                }

            })
    }


    /**
     * 快速创建被观察者observable
     * 直接发送传入的数组数据(会将数组中的数据转换成observable对象)
     */
    private fun initFormArray() {
        var items = arrayOf(0,1,2,3,4,5)
        Observable.fromArray(items)
            .subscribe(object :Observer<Array<Int>>{
                override fun onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

                override fun onSubscribe(d: Disposable) {
                    Log.d(TAG, "开始采用subscribe连接");
                }

                override fun onNext(t: Array<Int>) {
                    Log.d(TAG, "接收到了事件${t}");
                }

                override fun onError(e: Throwable) {
                    Log.d(TAG, "对Error事件作出响应");
                }

            })



    }

    /**
     * 快速创建被观察者observable
     * 直接发送传入集合list数据(会将list数据转成observable对象)
     */
    private fun initFromIterable() {
        var list = ArrayList<Int>()
        list.add(1)
        list.add(2)
        list.add(3)
        list.add(4)
        Observable.fromIterable(list).subscribe(object :Observer<Int>{
            override fun onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

            override fun onSubscribe(d: Disposable) {
                Log.d(TAG, "开始采用subscribe连接");
            }

            override fun onNext(t: Int) {
                Log.d(TAG, "接收到了事件${t}");
            }

            override fun onError(e: Throwable) {
                Log.d(TAG, "对Error事件作出响应");
            }

        })
    }

    /**
     *该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
     * 即观察者接收后会直接调用onCompleted()
     */
    @SuppressLint("CheckResult")
    private fun initEmpty() {
        Observable.empty<Int>()
    }

    /**
     * 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
     *  即观察者接收后会直接调用onError()
     * 可自定义异常
     */
    private fun initError() {
        Observable.error<Exception>(RuntimeException())
    }

    /**
     * 该方法创建的被观察者对象发送事件的特点:不发送任何事件
     * 即观察者接收后什么都不调用
     */
    private fun initNever() {
        Observable.never<String>();

    }

    /**
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
     * 本质 = 延迟指定时间后,调用一次 onNext(0)
     * 延迟指定事件,发送一个0,一般用于检测
     */
    private fun initTimer() {
        Observable.timer(3,TimeUnit.SECONDS)
            .subscribe(object :Observer<Long>{
                override fun onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

                override fun onSubscribe(d: Disposable) {
                    Log.d(TAG, "开始采用subscribe连接");
                }

                override fun onNext(t: Long) {
                    Log.d(TAG, "接收到了事件${t}");
                }

                override fun onError(e: Throwable) {
                    Log.d(TAG, "对Error事件作出响应");
                }

            })
    }

    /**
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:每隔指定时间 就发送 事件
     * 发送的事件序列 = 从0开始、无限递增1的的整数序列
     * interval参数说明
     * 参数1 第1次延迟时间
     * 参数2 间隔时间数字
     * 参数3 时间单位
     */
    private fun initInterval() {
        Observable.interval(3,1,TimeUnit.SECONDS)
            .subscribe(object :Observer<Long>{
                override fun onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

                override fun onSubscribe(d: Disposable) {
                    Log.d(TAG, "开始采用subscribe连接");
                }

                override fun onNext(t: Long) {
                    Log.d(TAG, "接收到了事件${t}");
                }

                override fun onError(e: Throwable) {
                    Log.d(TAG, "对Error事件作出响应");
                }

            })
    }
    /**
     * 作用类似于Interval,不过添加发送事件数量
     * 参数1  事件序列起始点;
     * 参数2  事件数量;
     * 参数3  第1次事件延迟发送时间;
     * 参数4  间隔时间数字;
     * 参数5  时间单位
     */
    private fun initIntervalRange() {
        Observable.intervalRange(2,10,2,1,TimeUnit.SECONDS)
            .subscribe(object :Observer<Long>{
                override fun onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

                override fun onSubscribe(d: Disposable) {
                    Log.d(TAG, "开始采用subscribe连接");
                }

                override fun onNext(t: Long) {
                    Log.d(TAG, "接收到了事件${t}");
                }

                override fun onError(e: Throwable) {
                    Log.d(TAG, "对Error事件作出响应");
                }

            })
    }



    /**
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:连续发送 1个事件序列,可指定范围
     * 发送的事件序列 = 从0开始、无限递增1的的整数序列
     * 作用类似于intervalRange(),但区别在于:无延迟发送事件
     * Range参数
     * 参数1 事件序列起始点(注起点不能为负数)
     * 参数2  事件数量
     */
    private fun initRange() {
        Observable.range(5,10)
            .subscribe(object :Observer<Int>{
                override fun onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }

                override fun onSubscribe(d: Disposable) {
                    Log.d(TAG, "开始采用subscribe连接");
                }

                override fun onNext(t: Int) {
                    Log.d(TAG, "接收到了事件${t}");
                }

                override fun onError(e: Throwable) {
                    Log.d(TAG, "对Error事件作出响应");
                }

            })
    }
    
}

···

相关文章

网友评论

      本文标题:Rxjava的基础操作符

      本文链接:https://www.haomeiwen.com/subject/sxuwpktx.html