1、RxJava概述
RxJava 是一个基于事件流、实现异步操作的库
Rxjava原理基于一种扩展的观察者模式,有4个角色:
被观察者(Observable):产生事件
观察者(Observer):接收事件,并给出响应动作
订阅(Subscribe):连接被观察者 & 观察者
事件(Event):被观察者 & 观察者沟通的载体
添加依赖:
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.0.7'
2、RxJava基本使用
2-1、订阅
// 1、创建被观察者Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
/**
* 被观察者Observable的subscribe中会使用ObservableEmitter发送事件,观察者响应对应的事件
*/
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
}
});
// 2、创建观察者Observer
Observer<Integer> observer = new Observer<Integer>() {
/**
* 观察者接收事件前,默认最先调用复写 onSubscribe()
*/
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: "+d.isDisposed());
}
/**
* 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
*/
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
/**
* 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
*/
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
/**
* 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
*/
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
// 3、当 Observable 被订阅后,观察者的Observer的OnSubscribe方法会自动被调用,被观察者Observable的subscribe方法会被调用
observable.subscribe(observer);
上述代码的效果基本如下:
D/MainActivity: onSubscribe: false
D/MainActivity: subscribe:
2-2、发送事件
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
// ObservableEmitter 事件发射器,向观察者发送事件
Log.d(TAG, "subscribe: 1 start");
emitter.onNext(1);
Log.d(TAG, "subscribe: 1 end");
Log.d(TAG, "subscribe: 2 start");
emitter.onNext(2);
Log.d(TAG, "subscribe: 2 end");
Log.d(TAG, "subscribe: 3 start");
emitter.onNext(3);
Log.d(TAG, "subscribe: 3 end");
Log.d(TAG, "subscribe: onComplete start");
emitter.onComplete();
Log.d(TAG, "subscribe: onComplete end");
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: "+d.isDisposed());
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
observable.subscribe(observer);
代码执行结果
D/MainActivity: onSubscribe: false
D/MainActivity: subscribe:
D/MainActivity: subscribe: 1 start
D/MainActivity: 对Next事件作出响应1
D/MainActivity: subscribe: 1 end
D/MainActivity: subscribe: 2 start
D/MainActivity: 对Next事件作出响应2
D/MainActivity: subscribe: 2 end
D/MainActivity: subscribe: 3 start
D/MainActivity: 对Next事件作出响应3
D/MainActivity: subscribe: 3 end
D/MainActivity: subscribe: onComplete start
D/MainActivity: 对Complete事件作出响应
D/MainActivity: subscribe: onComplete end
2-3、链式调用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
网友评论