目录:
1、前言
2、创建
3、订阅
4、线程控制:调度器Scheduler
5、网络请求失败,重试
Android小仙
感谢以下文章提供的指导:
给Android开发者的RxJava详解
RxJava/RxAndroid 使用实例实践
一、前言
1、RxJava到底是做什么的?
异步:
也就是说,RxJava是一个实现异步操作的库
2、同样是做异步,RxJava跟AsyncTask / Handler / XXX / ... 有什么区别?
简洁:
随着程序逻辑变得越来越复杂,依然能够保持简洁。
逻辑简洁
在AndroidStudio中可以实现Lambda化预览
3、目的:“后台处理,前台回调”的异步机制
二、创建
2.1、创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
2.2、创建订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
2.3、关于订阅者与观察者
订阅者内部实现了观察者的基本方法,从某种程度上来讲,订阅者同时扮演了观察者这个角色。
2.4、订阅者与观察者的区别
-
onStart()
: 处理事件还未触发前的准备工作 -
unSubscribe()
: 取消订阅
2.5、创建被观察者
被观察者持有订阅者对象,当事件执行到相应阶段时,调用订阅者的方法以达到提醒的作用。
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
-
create
: 是最基本的创造事件序列的方法 -
just(T...)
: 快捷创建事件序列的方法
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
-
from(T[])
/from(Iterable<? extends T>)
:将传入的数组或者Iterable拆分成具体的对象之后,依次执行发送
三、订阅
给 被观察者Observable
配置 观察者Observer/Subscriber
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
其实真的很别扭,按照逻辑来说应该是观察者订阅被观察者,但是从代码上来看却变成了反过来的
!!!
四、线程控制:调度器Scheduler
RxJava遵循的是线程不变的原则,在哪个线程调用subscribe就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
如果需要切换线程,那就需要用到调度器了。
-
Scheduler.immediate()
:在当前线程下执行 -
Scheduler.newThread()
:启动新线程,并在新线程下执行操作 -
Scheduler.io()
:I/O操作【读写文件、读写数据库、网络信息交互等】
比newThread()
效率更高,且不要在io()
中做计算工作 -
Scheduler.computation()
:计算操作。CPU密集型计算 -
AndroidSchedulers.mainThread()
:在主线程执行操作。
4.1、示例
栗子一、
Observable.just(1, 2, 3, 4)
// 指定 subscribe() 发生在 IO 线程
.subscribeOn(Schedulers.io())
// 指定 Subscriber 的回调发生在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
栗子二、
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
五、重试
1、重试类
public class RetryWithDelay implements Function<Observable<Throwable>, ObservableSource<?>> {
public final String TAG = this.getClass().getSimpleName();
private final int maxRetries;
private final int retryDelaySecond;
private int retryCount;
public RetryWithDelay(int maxRetries, int retryDelaySecond) {
this.maxRetries = maxRetries;
this.retryDelaySecond = retryDelaySecond;
}
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable
.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
Log.d(TAG, "Observable get error, it will try after " + retryDelaySecond
+ " second, retry count " + retryCount);
return Observable.timer(retryDelaySecond,
TimeUnit.SECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
2、网络请求
HttpApi.api(getActivity()).getAdImage()
// 表示getAdImage执行在I/O线程
.subscribeOn(Schedulers.io())
// 重试,三次,延迟1秒
.retryWhen(new RetryWithDelay(2,1))
// 在开始网络请求前的预操作
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
getActivity().showToast("要准备开始网络请求咯!!!");
}
})
// 表示上面的doOnSubscribe执行在主线程
.subscribeOn(AndroidSchedulers.mainThread())
// 表示下面的观察者执行在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> images) throws Exception {
adImageUrls.clear();
adImageUrls.addAll(images);
getView().setListData(images);
}
// 异常处理
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
getActivity().showToast(getActivity().getString(R.string.net_loading_failed));
}
});
网友评论