创建操作符
-
作用
创建 被观察者( Observable) 对象 & 发送事件
-
常见类型
-
应用场景 & 对应操作符介绍
- create()
作用
完整创建1个被观察者对象(Observable),RxJava 中创建被观察者对象最基本的操作符.在上次已经使用过了。 - just()
作用
快速创建1个被观察者对象(Observable)最多只能发送10个参数,在上次也已经使用过了。
发送事件的特点:直接发送 传入的事件。 - fromArray()
作用
快速创建1个被观察者对象(Observable),会将数组中的数据转换为Observable对象,可以发送10个以上的参数。
发送事件的特点:直接发送 传入的数组数据,数组元素遍历.
实例
public void fromArrayOperator(View view){
Integer[] items = { 0, 1, 2, 3, 4 };
Observable.fromArray(items).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件"+ integer);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
- fromIterable()
作用
快速创建1个被观察者对象(Observable)会将集合中的数据转换为Observable对象,发送10个以上事件(集合形式)
发送事件的特点:直接发送 传入的集合List数据,集合元素遍历.
实例
public void fromIterableOperator(View view){
List<String> list=new ArrayList<>();
list.add("A");
list.add("B");
list.add("C");
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(String s) {
Log.e("sss", "接收到了事件"+ s);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
4.快速创建额外的操作符
// 下列方法一般用于测试使用
<-- empty() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
// 即观察者接收后会直接调用onCompleted()
public void emptyOperator(View view){
Observable.empty()
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Object o) {
Log.e("sss", "接收到了事件"+ o);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
<-- error() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()
<-- never() -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用
5.defer()
作用
直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
通过 Observable工厂方法创建被观察者对象(Observable),每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的
实例
Integer i=6;
public void deferOperator(View view){
Observable<Integer> observable=Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(i);
}
});
i=8;
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件"+ integer);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
6.timer()
作用
快速创建1个被观察者对象(Observable),发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)。
本质 = 延迟指定时间后,调用一次 onNext(0),一般用于检测。
实例
public void timerOperator(View view){
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Long aLong) {
Log.e("sss", "接收到了事件"+ aLong);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
7.interval()
作用
快速创建1个被观察者对象(Observable),发送的事件序列 = 从0开始、无限递增1的的整数序列
发送事件的特点:每隔指定时间 就发送 事件
实例
public void intervalOperator(View view){
Observable.interval(5,2,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Long aLong) {
Log.e("sss", "接收到了事件"+ aLong);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
8.intervalRange()
作用
快速创建1个被观察者对象(Observable),发送的事件序列 = 从0开始、无限递增1的的整数序列,作用类似于interval(),但可指定发送的数据和数量.
发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
实例
public void intervalRangeOperator(View view){
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 参数3 = 第1次事件延迟发送时间;
// 参数4 = 每个间隔时间;
// 参数5 = 时间单位
Observable.intervalRange(6,10,5,2,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Long aLong) {
Log.e("sss", "接收到了事件"+ aLong);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
9.range()
作用
快速创建1个被观察者对象(Observable)发送的事件序列 = 从0开始、无限递增1的的整数序列,作用类似于intervalRange(),但区别在于:无延迟发送事件
发送事件的特点:连续发送 1个事件序列,可指定范围
实例
public void range(View view){
// 参数1 事件序列起始点
// 参数2 事件数量
Observable.range(6,10).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Integer integer) {
Log.e("sss", "接收到了事件"+ integer);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
10.rangeLong()
作用:类似于range(),区别在于该方法支持数据类型 = Long
实例
public void rangeLong(View view){
// 参数1 事件序列起始点
// 参数2 事件数量
Observable.rangeLong(6,10).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("sss", "开始采用subscribe连接");
}
@Override
public void onNext(Long along) {
Log.e("sss", "接收到了事件"+ along);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
-
实际开发中的应用
1.不断轮询配合Retrofit查询网络数据
客户端不断的轮询查询网络数据
public void requestPolling(View view){
Observable.interval(2,1,TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("sss","第"+aLong+"次轮询");
Retrofit retrofit=new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRequest_Interface request=retrofit.create(GetRequest_Interface.class);
Observable<Translation> observable=request.getCall();
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Translation translation) {
Log.e("sss", translation.getContent().getOut());
}
@Override
public void onError(Throwable e) {
Log.d("sss", "请求失败");
}
@Override
public void onComplete() {
}
});
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
// Log.e("sss", "接受到事件"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
网友评论