RXjava1的依赖
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.3'
RXJava2的依赖
//rxjava2
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
实现功能 :连续发射偶数
RXJava 1的写法
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
for (int i = 0; i < 1000; i++) {
subscriber.onNext(i);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%2 == 0;
}
})
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
ToastUtils.showToast(integer + "");
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
RXJava 2的写法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
try {
for (int i = 0; i < 1000; i++) {
subscriber.onNext(i);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
//过滤器
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer i) throws Exception {
return i % 2 == 0;
}
})// .sample(2, TimeUnit.SECONDS) //sample取样
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer s) throws Exception {
ToastUtils.showToast(s + "");
}
});
网友评论