介绍一下RxJava创建时候的一些基础操作符。主要以代码和打印结果的方式介绍,简单明了,均在模拟器上实现过。
create()
最基本的操作符,创建一个被观察者对象
Observable<String> observable =Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onNext("b");
e.onNext("c");
e.onComplete();
}
});
Observer<String> observer =new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onsubscribe");
}
@Override
public void onNext(String s) {
Log.e("yzh","onNext--"+s);
}
@Override
public void onError(Throwable t) {
Log.e("yzh","onError--"+t.getMessage());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
};
observable.subscribe(observer);
打印结果
01-30 06:26:45.095 2479-2479/? E/yzh: onsubscribe
01-30 06:26:45.095 2479-2479/? E/yzh: onNext--a
01-30 06:26:45.095 2479-2479/? E/yzh: onNext--b
01-30 06:26:45.096 2479-2479/? E/yzh: onNext--c
01-30 06:26:45.096 2479-2479/? E/yzh: onComplete
just()
快速创建一个被观察者对象,可以替代多个onNext()发送事件,最多只能发送10个参数
//等同于create()方式创建时onNext(1),onNext(2),onNext(3),onNext(4)
Observable.just(1,2,3,4)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError-"+e.getMessage());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--1
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--2
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--4
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onComplete
fromArray()
快速创建1个被观察者对象,直接发送 传入的数组数据,可以发送超过10个数据
Integer[] items ={0,1,2,3};
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh",e.getMessage());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
01-30 06:55:28.353 4879-4879/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--0
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--1
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--2
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onComplete
fromIterable
快速创建1个被观察者对象,直接发送 传入的集合List数据
List<String> list =new ArrayList<>();
list.add("A");
list.add("B");
list.add("C");
Observable.fromIterable(list)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(String s) {
Log.e("yzh","onNext--"+s);
}
@Override
public void onError(Throwable e) {
Log.e("yzh",e.getMessage());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onNext--A
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onNext--B
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onNext--C
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onComplete
被观察者的几个特殊创建
// 除onSubscribe不会触发观察者的任何事件
// Observable observable=Observable.never();
// 除onSubscribe仅触发观察者的onComplete
// Observable observable=Observable.empty();
// 除onSubscribe仅触发观察者的onError
Observable observable=Observable.error(new RuntimeException());
Observer observer =new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Object o) {
Log.e("yzh","onNext");
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
};
defer()
相比较于just这些方式,defer创建会在订阅的时候创建,可以确保被观察者中的数据是最新的
i=10;
// Observable<Integer> observable =Observable.just(i);
// 此时被观察者对象还没创建
Observable<Integer> observable=Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
Observer<Integer> observer=new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComlete");
}
};
i=20;
//订阅才会调用defer()创建被观察者对象(Observable)
observable.subscribe(observer);
打印结果
01-30 07:44:05.985 7165-7165/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 07:44:05.985 7165-7165/com.stone.appuser.rxtest E/yzh: onNext--20
01-30 07:44:05.985 7165-7165/com.stone.appuser.rxtest E/yzh: onComlete
使用屏蔽的代码,即使用just方式时, 打印结果会变成onNext--10;、
timer()
快速创建一个被观察者,但是会延迟事件的发送,实验带一个just方法传入事件,但是观察者收不到,只能收到0,实际操作中可以在这里触发方法但是无法带入值
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Long aLong) {
//接受到的值是0,可以放一个action()做一个操作无法带参数
Log.e("yzh","onNext--"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
01-30 07:54:32.650 7675-7675/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 07:54:34.650 7675-7695/com.stone.appuser.rxtest E/yzh: onNext--0
01-30 07:54:34.651 7675-7695/com.stone.appuser.rxtest E/yzh: onComplete
interval()
快速创建一个被观察者,指定事件发送事件并且间隔时间发送
从0开始、无限递增1的发送整数
//参数一开始发送事件时间
//参数二连续发送事件的时间间隔
//参数三时间单位
Observable.interval(3,1,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Long aLong) {
Log.e("yzh","onNext--"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果,可以观察一下时间信息
01-30 08:28:25.580 8710-8710/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 08:28:28.580 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--0
01-30 08:28:29.580 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--1
01-30 08:28:30.581 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--2
01-30 08:28:31.580 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--3
会按一秒一条一直打印
intervalRange()
快速创建一个被观察,与interval的区别在与可以设置发送事件的起始值和发送事件次数
//参数一 发送事件的起始值
//参数二 发送事件的次数
Observable.intervalRange(3,10,2,1,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Long aLong) {
Log.e("yzh","onNext--"+aLong);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
01-30 08:47:51.658 9331-9331/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 08:47:53.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 08:47:54.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--4
01-30 08:47:55.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--5
01-30 08:47:56.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--6
01-30 08:47:57.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--7
01-30 08:47:58.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--8
01-30 08:47:59.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--9
01-30 08:48:00.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--10
01-30 08:48:01.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--11
01-30 08:48:02.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--12
01-30 08:48:02.659 9331-9370/com.stone.appuser.rxtest E/yzh: onComplete
range()
快速创建一个被观察者对象,连续发送一个区间类的事件
Observable.range(3,10)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("yzh","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("yzh","onNext--"+integer);
}
@Override
public void onError(Throwable e) {
Log.e("yzh","onError--"+e.toString());
}
@Override
public void onComplete() {
Log.e("yzh","onComplete");
}
});
打印结果
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--4
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--5
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--6
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--7
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--8
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--9
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--10
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--11
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--12
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onComplete
网友评论