Rxjava原理介绍
Rxjava原理 基于 一种扩展的观察者模式
Rxjava的扩展观察者模式中有4个角色:
被观察者(Observable):产生事件
观察者(Observer):接收事件,并给出响应动作
订阅(Subscribe):连接 被观察者 & 观察者
事件(Event):被观察者 & 观察者 沟通的载体
class RxjavaActivity : AppCompatActivity() {
private val TAG = "YSL"
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_rxjava)
initCreate()
initJust()
initFormArray()
initFromIterable()
//empty error never一般测试使用
initEmpty()
initError()
initNever()
//延迟创建
initTimer()
initInterval()
initIntervalRange()
initRange()
}
/**
* rexjava正常的发送及接受
*/
@SuppressLint("CheckResult")
private fun initCreate() {
var observable = Observable.create(ObservableOnSubscribe<Int> {
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onComplete()
}).subscribe(object : Observer<Int> {
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Int) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
/**
* 快速创建被观察者observable,
* 发送:直接发送传入的事件(做多10个参数)
*/
private fun initJust() {
Observable.just(1, 2, 3, 4, 5)
.subscribe(object : Observer<Int> {
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Int) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
/**
* 快速创建被观察者observable
* 直接发送传入的数组数据(会将数组中的数据转换成observable对象)
*/
private fun initFormArray() {
var items = arrayOf(0,1,2,3,4,5)
Observable.fromArray(items)
.subscribe(object :Observer<Array<Int>>{
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Array<Int>) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
/**
* 快速创建被观察者observable
* 直接发送传入集合list数据(会将list数据转成observable对象)
*/
private fun initFromIterable() {
var list = ArrayList<Int>()
list.add(1)
list.add(2)
list.add(3)
list.add(4)
Observable.fromIterable(list).subscribe(object :Observer<Int>{
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Int) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
/**
*该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
* 即观察者接收后会直接调用onCompleted()
*/
@SuppressLint("CheckResult")
private fun initEmpty() {
Observable.empty<Int>()
}
/**
* 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
* 即观察者接收后会直接调用onError()
* 可自定义异常
*/
private fun initError() {
Observable.error<Exception>(RuntimeException())
}
/**
* 该方法创建的被观察者对象发送事件的特点:不发送任何事件
* 即观察者接收后什么都不调用
*/
private fun initNever() {
Observable.never<String>();
}
/**
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
* 本质 = 延迟指定时间后,调用一次 onNext(0)
* 延迟指定事件,发送一个0,一般用于检测
*/
private fun initTimer() {
Observable.timer(3,TimeUnit.SECONDS)
.subscribe(object :Observer<Long>{
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Long) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
/**
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:每隔指定时间 就发送 事件
* 发送的事件序列 = 从0开始、无限递增1的的整数序列
* interval参数说明
* 参数1 第1次延迟时间
* 参数2 间隔时间数字
* 参数3 时间单位
*/
private fun initInterval() {
Observable.interval(3,1,TimeUnit.SECONDS)
.subscribe(object :Observer<Long>{
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Long) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
/**
* 作用类似于Interval,不过添加发送事件数量
* 参数1 事件序列起始点;
* 参数2 事件数量;
* 参数3 第1次事件延迟发送时间;
* 参数4 间隔时间数字;
* 参数5 时间单位
*/
private fun initIntervalRange() {
Observable.intervalRange(2,10,2,1,TimeUnit.SECONDS)
.subscribe(object :Observer<Long>{
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Long) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
/**
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:连续发送 1个事件序列,可指定范围
* 发送的事件序列 = 从0开始、无限递增1的的整数序列
* 作用类似于intervalRange(),但区别在于:无延迟发送事件
* Range参数
* 参数1 事件序列起始点(注起点不能为负数)
* 参数2 事件数量
*/
private fun initRange() {
Observable.range(5,10)
.subscribe(object :Observer<Int>{
override fun onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "开始采用subscribe连接");
}
override fun onNext(t: Int) {
Log.d(TAG, "接收到了事件${t}");
}
override fun onError(e: Throwable) {
Log.d(TAG, "对Error事件作出响应");
}
})
}
}
···
网友评论