1.简单使用
Observable
.just("miaozi")// 返回 ObservableJust
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
//可进行耗时操作
return s;
}
})// 返回 ObservableMap
.subscribeOn(Scheduler.io())// 返回 ObservableSubscribeOn
.observeOn(Scheduler.mainThread())// ObservableObserveOn
.subscribe(new Consumer<String>() {//开始执行
@Override
public void accept(String s) throws Exception {
Log.e("TAG",s);
}
});
这个是我写的一个简单的RxJava中的map使用和子线程和主线程之间的调度使用。目的主要是加深对RxJava的理解。说实话,我自己是看视频对源码进行分析的,分析了有一个星期的时间吧,到现在才弄懂其中的源码。
RxJava是一种响应式的编码思想,可以是采用链式和递归的方式来调用的,还采用的静态代理的设计模式来代理使用,下面我在一一介绍和分析。
2.分析
第一步:just返回的是一个ObservableJust对象,里面存储一个 value 值 和实现了 subscribeActual方法。
public final class ObservableJust<T> extends Observable<T>{
private final T value;
public ObservableJust(final T value) {
this.value = value;//miaozi
}
@Override
protected void subscribeActual(Observer<T> observer) {
Log.e("TAG","ObservableJust" + " observer="+observer.toString());
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
observer.onSubscribe();
sd.run();
}
}
第二步:第一步的基础之上 ObservableJust.map() 返回一个ObservableMap对象,里面有两个值,一个是source,source其实就是ObservableJust对象。一个是function对象。还是实现了subscribeActual方法。
/**
* created by panshimu
* on 2019/11/13
*/
public class ObservableMap<T, U> extends Observable<U>{
final Observable<T> source;//其实就是 ObservableJust
final Function<T,U> function;
public ObservableMap(Observable<T> source, Function<T,U> function) {
this.function = function;
this.source = source;
}
@Override
protected void subscribeActual(Observer<U> observer) {
Log.e("TAG","ObservableMap"+" observer="+observer.toString());
//静态代理 调用的是ObservableJust中的subscribeActual
source.subscribe(new MapObserver<T>(observer,function));
}
private final class MapObserver<T> implements Observer<T> {
final Function<T,U> function;
final Observer<U> observer;
public MapObserver(Observer<U> observer, Function<T,U> function) {
this.function = function;
this.observer = observer;
}
@Override
public void onSubscribe() {
observer.onSubscribe();
}
@Override
public void onNext(@NonNull T t) {
try {
U apply = function.apply(t);
observer.onNext(apply);
}catch (Exception e){
observer.onError(e);
}
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
第三步骤:切换子线程 subscribeOn 返回一个ObservableSubscribeOn对象,里面有两个值,一个是source,source就是上一个ObservableMap对象,还有一个值就是Scheduler,并且也实现了subscribeActual方法。
package com.miaozi.myrxjava;
import android.util.Log;
/**
* created by panshimu
* on 2019/11/15
*/
public class ObservableSubscribeOn<T> extends Observable<T> {
final Observable<T> source;//上一个 ObservableMap
final Scheduler scheduler;//IOScheduler
public ObservableSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<T> observer) {
Log.e("TAG","ObservableSubscribeOn" + " observer="+observer.toString());
//执行的是 IOScheduler 中的 scheduleDirect 然后会调用 SubscribeTask 中的run方法
scheduler.scheduleDirect(new SubscribeTask(source,observer));
}
}
public class SubscribeTask<T> implements Runnable {
Observer<T> observer;//
final Observable<T> source;//ObservableMap
public SubscribeTask(Observable<T> source, Observer<T> observer) {
this.source = source;
this.observer = observer;
}
@Override
public void run() {
Log.e("TAG", "SubscribeTask" + " 切换到子线程-->"+ source.toString());
//调用ObservableMap中的 subscribeActual
source.subscribe(observer);
}
}
第四步:observeOn 切换回主线程 返回一个 ObservableObserveOn 对象,这个里面也是存储两个值,一个是source,source就是上一层ObservableSubscribeOn对象,另一个就是Scheduler了。这个也实现了subscribeActual方法。
package com.miaozi.myrxjava;
import android.util.Log;
import androidx.annotation.NonNull;
/**
* created by panshimu
* on 2019/11/15
*/
class ObservableObserveOn<T> extends Observable<T> {
Observable<T> source;//ObservableSubscribeOn
Scheduler scheduler;
public ObservableObserveOn(Observable<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<T> observer) {
Log.e("TAG","ObservableObserveOn"+ " observer="+observer.toString());
//调用ObservableSubscribeOn中的subscribeActual
source.subscribe(new ObserveOnObserver(observer));
}
private class ObserveOnObserver implements Observer<T>, Runnable {
private T item;
final Observer<T> observer;
public ObserveOnObserver(Observer<T> observer) {
this.observer = observer;
}
@Override
public void onSubscribe() {
observer.onSubscribe();
}
@Override
public void onNext(@NonNull T t) {
item = t;
scheduler.scheduleDirect(this);
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
@Override
public void run() {
observer.onNext(item);
}
}
}
第五步:subscribe 调用的时候ObservableObserveOn父类中的subscribe,最终调用ObservableObserveOn中的subscribeActual(observer); observer就是Consumer。也就是调用上层的subscribeActual。
可以发现他其实是一条链子在调用,到最后一个要开始执行的时候,根据它的source也就是上级,一层一层的网上递归的调用,知道最上层然后执行方法又一层一层往回调用。
这里说明一下切换主线程和子线程,看源码不难发现切换到子线程是开启一个线程池,然后把 source.subscribe(observer);放到子线程的run方法中,就可以实现子线程的切换,然后又怎么切换到主线程呢?
切换主线程:
package com.miaozi.myrxjava;
import android.os.Handler;
import android.os.Message;
/**
* created by panshimu
* on 2019/11/18
*/
public class MainScheduler extends Scheduler {
private Handler handler;
public MainScheduler(Handler handler) {
this.handler = handler;
}
@Override
public void scheduleDirect(Runnable runnable) {
Message obtain = Message.obtain(handler, runnable);
handler.sendMessage(obtain);
}
}
这段话很简单把?一开始我也猜想到是用 handler 的方式,一看就是。但是跟我们正常的切换时不一样的,并没有实现handlerMassage()方法的重写,是因为handler的源码中放我们实现了,具体去看handler源码中的callback。
再看MapObserver中的onNext()
public void onNext(@NonNull T t) {
try {
U apply = function.apply(t);
observer.onNext(apply);
}catch (Exception e){
observer.onError(e);
}
}
这里的onNext就是不断的下载传递,进行value的转换,最后回执行到LambdaObserver中的onNext()
@Override
public void onNext(@NonNull T t) {
try {
this.onNext.accept(t);
} catch (Exception e) {
e.printStackTrace();
}
}
看到这里就可以看到 accept()方法了。这是最后最后的回调回去了。
网友评论