rxjava通过创建观察者和被观察者对象进行订阅实现线程切换
引入依赖
implementation 'io.reactivex.rxjava2:rxjava:2.2.0' //Rxjava
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
创建观察者对象
private Observer observer=new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.e("收到的内容",o.toString());
Log.e("接受线程",Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) { //错误
}
@Override
public void onComplete() { //结束
}
};
订阅和切换线程
.observeOn(AndroidSchedulers.mainThread()) //观察者运行的线程
.subscribeOn(Schedulers.io()) //被观察者运行的线程
.subscribe(observer); //将观察者加入订阅
create操作符 创建一个完整的被观察者对象
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e("发送", Thread.currentThread().getName());
emitter.onNext("ceshi"); //发送信息
}
}).subscribe(observer); //进行订阅
just 快速创建一个被观察者对象
Observable.just("asdasd")
.subscribe(observer);
delay 延时发送操作符
// 1.时间 2.时间单位 3.线程调度器
Observable.just("asdasd").delay(3,TimeUnit.SECONDS,Schedulers.io()).
subscribe(observer);
defer 订阅的时候,返回一个观察者对象,以保证数据的时效性
Observable.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return Observable.just("asdasd");
}
})
timer 快速创建一个被观察者对象,发送过去的内容是0
Observable.timer(3,TimeUnit.SECONDS).subscribe(observer);
interval 周期性发送事件,内容从0开始递增
//1.开始事件 2.间隔事件 3.时间单位
Observable.interval(1,1,TimeUnit.SECONDS).subscribe(observer);
range 快速创建一个被观察者对象,指定发送范围
// 1.开始 2.结束
Observable.range(1,10).subscribe(observer);
filter 自定义过滤器 ,西面是过滤掉小于6的数字
Observable.just(6,2,3).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>=6;
}
}) .observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(observer);
网友评论