rxjava
简单使用
其实rxjava入门并没有想象中的难,常用的简单方法就几个。我们来看一个最简单的例子
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtil.i("XJL", "1");
}
});
这个例子就是利用create创建出来一个被观察者,这个被观察者仅仅只是发送了个数字便结束了,我们需要注意的是,在我们创建出来这个被观察者时,如果没有明确指定一创建就执行的话,它并不会马上执行,只有在subcribe调用时,也就是有观察者进行订阅时才会执行。其中,subscribeOn用来指定被观察者的执行线程,它对前面的被观察者起作用。而observeOn方法用来指定观察者的执行线程,它对后面的subscribe方法起作用。subscribe用来设置我们的观察者,也就是我们对于被观察者发射的数据的处理方法。
当然我们也能够通过rxjava提供的更为便捷的创建方法创建Observable
Observable.just(1, 2, 3).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("XJL", "=====>" + integer );
}
});
这个just就是rxjava提供的遍历函数,它会将我们给定的参数按序发送给下游,类似这样的函数还有很多,这个就是rxjava的简单使用。
rxjava结合retrofit的使用
retrofit作为现在android开发者的利器,当然也和rxjava能够互通有无。下面我们来看看retrofit如何结合rxjava来使用。
retrofit配置
首先我们需要在项目中的build.gradle中引入rxjava依赖,和jakewharton提供的rxjava2和retrofit完美结合的插件。因为retrofit现在还没有完全兼容刚升级不久的rxjava2。
...
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
compile 'android.arch.persistence.room:rxjava2:1.0.0'
...
然后我们需要在构造retrofit实体的时候,添加rxjavaAdapter
return new Retrofit.Builder()
.baseUrl(baseUrl)
.client(createOkHttpClient())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create(createGsonConverter()))
.build();
这个RxJava2CallAdapterFactory就是com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0这个依赖库中所提供的插件。这样,我们最基础的配置就完成了。
对于我们的Interface,最初的使用是返回的OkHttp所需的Call<T>,现在引入了rxjava2,我们可以将返回类型替换为Observable<T>,对于这个Observable的生成是在网络返回时,在adapterFactory通过我们前面配置的rxjavaAdapter来进行转化。
//原始的retrofit请求
@GET("commonUrl")
Call<T> getT();
//Rxjava的retrofit请求
@GET("commonUrl")
Obserable<T> getT();
然后在最外层的调用可以直接通过获取到这个Obserable进行相应的处理.
Client.getT().subscribeOn(Schedulers.io())..observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(T o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
其中onSubscribe回调是在我们执行了subscribe方法之后的回调,onNext是成功返回的回调,onError是网络错误的回调,onComplete是整个流程结束后的返回。
rxjava订阅流程解析
Observable.create方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//将会返回一个ObservableCreate,封装了原始的Observable,
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
这个create方法接收一个ObservableOnSubscribe参数,这个ObservableOnSubscribe提供了我们需要的常规操作。
public interface ObservableOnSubscribe<T> {
//通过subscribe的参数ObservableEmitter来发送消息
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
ObservableOnSubscribe是一个接口,需要实现它的subscribe方法,我们在被观察者里面做的操作都是在这个方法里面做处理的,包括了onNext方法,onError,和onComplete的发射。所以,这个方法需要接收一个发射器的参数,我们通过这个发射器来发射数据。这个发射器就是Emitter,ObservableEmitter继承于Emitter,并且提供了几个需要的方法。
//Emitter
public interface Emitter<T> {
//onNext事件,能够有多个
void onNext(@NonNull T value);
//onError事件,仅能有一个,一旦有这个事件,将不会再收到onNext或者onComplete事件
void onError(@NonNull Throwable error);
//onComplete事件,仅能有一个,一旦有这个事件,将不会再收到onNext或者onError事件
void onComplete();
}
//ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
//这个发射器的使能开关
void setDisposable(@Nullable Disposable d);
//这个发射器是否可以取消
void setCancellable(@Nullable Cancellable c);
//停止这个序列
boolean isDisposed();
//序列化这个发射器
@NonNull
ObservableEmitter<T> serialize();
}
对于rxjava的执行,每次执行了一个操作符,都会对原始的Observable做一次封装。比如说create方法,传入ObservableOnSubscribe,经过封装之后,返回的是ObservableCreate类。对于每一个封装类,都需要实现相应的调用方法,也就是当被订阅时需要实现的方法subscribeActual(),这个方法是在Observable中定义的抽象方法。
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//回调observer的onSubscribe方法
observer.onSubscribe(parent);
try {
//原始observer订阅被观察者
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
在这个subscribeActual方法中,将会创建出CreateEmitter类,这个类是用来发射我们定义的发射数据的,然后将被观察者和观察者建立联系。
Observable.subscribeOn
对于rxjava中的线程切换,那是非常方便的,subscribeOn就是其中的一个线程切换的方法,subscribeOn是用来指定被观察者执行的线程的。
public final Observable<T> subscribeOn(Scheduler scheduler) {
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
这个方法同样的会返回一个关于线程切换的封装类ObservableSubscribeOn。首先,我们先来了解一下rxjava常用的线程Scheduler
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask()); //指定单个调度线程
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); //主要用于复杂计算
IO = RxJavaPlugins.initIoScheduler(new IOTask()); //主要用于网络请求或者IO操作
TRAMPOLINE = TrampolineScheduler.instance(); //在同一线程,但是不马上执行
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); //创建出一个新的线程
}
其中最经常用到的就是IO线程了,网络请求推荐使用IO线程,本地数据操作推荐使用IO线程。假如我们对同一个Observable进行多次切换线程,起作用的只有第一个切换的线程。这个的原理就要看看ObservableSubscribeOn的实现了。
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//创建出SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//回调onSubscribe方法
s.onSubscribe(parent);
//设置Disposable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
上面的几步最重要的就是scheduler.scheduleDirect(new SubscribeTask(parent)),这个方法首先会创建SubscribeTask,这个其实就是就是一个runnable,在run方法中进行订阅操作,订阅操作执行的就是我们定义发射方法。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
一旦我们执行了scheduleDirect方法之后,就会马上执行SubscribeTask的run方法。那么一旦我们切换了多个被观察者的执行线程,比如下面:
Observable.interval(1000, TimeUnit.MICROSECONDS)
.subscribeOn(Schedulers.io()).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.computation());
首先会经过第一层封装,在io线程的SubscribeTask中执行原始的Observable定义的发射方法。然后经过第二层封装,在newThread中执行第一层封装后的方法,最后经过computation的封装,在computation指定的线程中执行第二层封装的方法,由此可见,最后执行原始Observable的线程还是在第一个指定的线程中执行。再举个更简单的例子:
new Thread {
@Override
public void run() {
new Thread {
@Override
public void run() {
//执行原始被观察者的发射方法
...
}
}
}
}
不管外部包了多少层,最终执行原始被观察者的发射方法的线程也都是在最内部的线程执行的。
Obserable.observeOn
rxjava中,用observeOn指定后面的观察者的执行线程。
public final Observable<T> observeOn(Scheduler scheduler) {
//bufferSize为默认的缓存区大小,为BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128))
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
执行了observeOn方法时,会将先前的Observable封装成ObservableObserveOn
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建出一个Work
Scheduler.Worker w = scheduler.createWorker();
//订阅ObserveOnObserver
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
在ObserveOnServer方法中,对于每一个emitter发射出来的事件
@Override
public void onNext(T t) {
...
schedule();
}
@Override
public void onError(Throwable t) {
...
schedule();
}
@Override
public void onComplete() {
...
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
对于rxjava设置的observeOn方法,只有最后一个会生效,这个原因是因为每执行一次,都会将其放置在工作Looper执行。比如下面的例子:
Observable.interval(1000, TimeUnit.MICROSECONDS)
.subscribeOn(Schedulers.io()).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
首先会将原始的Observable放在computation线程中执行,io线程封装computation线程,最后主线程封装io线程。因为observeOn对后面的obsever起作用,所以最后subscribe执行的线程将会是最外层的线程。
new Thread {
@Override
public void run() {
new Thread {
@Override
public void run() {
//执行原始被观察者的发射方法
...
}
}
//执行observer的方法
...
}
}
其实subscribe和observeOn比较类似,只是两个起作用的区域不一样,subscribe对前面起作用,所以最后执行的线程是最内层的。observeOn对后面起作用,所以最后执行到的线程是在最外层的线程。
Observable.subscribe
这个是rxjava订阅的入口,如果未指明创建立即执行的话,默认是在订阅方法执行时执行。这个方法接收一个Observer,也就是观察者。
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
直接返回的Disposable,
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
//封装程lambdaObserver
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
//订阅入口
subscribe(ls);
return ls;
}
最终在subscribe中调用了subscribeActual
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
对于这个subscribeActual我们应该很熟悉,对于每一个Observable都需要实现这个方法。这个方法是在Observable定义的抽象方法。这样,就会调用到特定Observable的subscribeActual,逐层调用,最后调用到原始的Observable的emitter方法,发射数据。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//调用了被观察者的发射方法。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
rxjava常用的操作符
比较常用的有map,flatMap,merge、zip、concat、takeUntil、concat,详情可见官方的中文使用文档https://github.com/mcxiaoke/RxDocs
那么rxjava的操作符是如何实现的呢?我们通过map来看看rxjava中的操作符实现。
使用:
Observable.just(1).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "haha";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
map将会将流中1转化为String类型,并且向下传递给subscribe订阅者。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
实际上,最后是创建了一个ObservableMap实例
//我们定义的map的function
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
source表示的原始的流,function表示我们提供的转化方法。在上面的例子就是source为1,function就是new Function<Integer, String>()。我们知道,在subscribe调用时,将会调用到subscribeActual方法,也就是调用到ObservableMap的subscribeActual上。在这个方法上将会给source重新订阅一个MapObserver。
//转化方法
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
订阅完后,其实最后会调用到了onNext方法。
@Override
public void onNext(T t) {
...
U v;
try {
//首先会执行map转化方法
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//向下传递转化后的元素。
actual.onNext(v);
}
其实在这个方法中,首先会通过map接收上一步传递下来的元素,通过function的转化,返回对应的类型,并且向下传递。
也就是说整体的调用流程是这样的:
ObservableMap.subscribe -> ObservableMap.subscribeActual -> source.subscribe -> source.subscribeActual -> MapObserver.onNext -> map.apply -> Observer.next
先通过subscribe和subscribeActual进行逐级订阅,然后从source一直往下执行onNext,遇到操作符就进行转化,直到最后一个observer。
Schedulers的内部实现
subscribeOn
在我们调用到subscribeOn时,其实会创建出对应的ObservableSubscribeOn
new ObservableSubscribeOn<T>(this, scheduler)
接�收了传递进去的Scheduler和原始的observable。也是基于原始的obsevable做了一层包装,所以,最终的执行代码也会在subscribeActual中
@Override
public void subscribeActual(final Observer<? super T> s) {
//对observer进行封装,
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
最终会调用到scheduler.scheduleDirect(new SubscribeTask(parent))。
这个SubscribeTask是一个runnable
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
它会在run方法中,进行subscribe,也就是说,哪个线程执行这个subscribe,将会在那个线程执行被观察者的操作。
在scheduleDirect方法中,将会进行scheduler的调度。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
首先是createWorker操作,这个是一个抽象方法,需要具体的Scheduler实现,也就是每一个Scheduler都会有一个单独的Workder实例。最终调用worker的schedule方法,在schedule方法又会调用scheduleActual方法,下面针对不同的Scheduler看看具体的实现
NewThreadWorker及其子类
对应的就是Schedulers.newThread(),Schedulers.io()等。
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
...
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
...
Future<?> f;
...
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
...
return sr;
}
内部会先创建出ScheduledRunnable,并且通过delayTime和线程池来进行操作。这个executor是核心线程大小为1的线程池。
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
HandlerWorker
这个Android才会有的Worker,主要是用来切换主线程的。
首先它会持有主线程的handle
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
在schedule中会通过handler将这个任务post出去。
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
...
return scheduled;
}
observerOn
在入口处,和subscribeOn基本一致,将会将Observable转化为ObservableObserveOn,下面是其实际执行的代码块subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
//如果是在当前线程,
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//有指定线程切换
Scheduler.Worker w = scheduler.createWorker();
//通过worker包装observer
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
其实subscribeOn是包装的observable的,而observerOn是包装observer的。
也就是最终会执行到ObserveOnObserver的onNext方法上。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
在onNext方法中调用schedule,执行worker的schedule
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
网友评论