*概述
*基本用法及事件流程
*线程切换
概述
RxJava源码:https://github.com/ReactiveX/RxJava
RxJava出来也有好多年了,一直热度不减。网上也有大量的对这个框架的介绍:事件流、响应式编程、观察者模式......
不多说,相信每个了解过RxJava的同学都有自己的体会。这次笔记主要从RxJava的基本使用出发,从源码角度分析它的运作流程。希望能够以点带面,窥一窥这个框架的思想。
一、基本用法及事件流程
1、基本用法
直接贴用法:
public class HomeFragment extends Fragment {
private static final String TAG = "HomeFragment";
private static final String path = "https://ss0.baidu.com/7Po3dSag_xI4khGko9WTAnF6hhy/zhidao/pic/item/3bf33a87e950352ab1d3f8a45343fbf2b3118be8.jpg";
private HomeViewModel homeViewModel;
private FragmentHomeBinding binding;
private ImageView mImage;
public View onCreateView(@NonNull LayoutInflater inflater,
ViewGroup container, Bundle savedInstanceState) {
homeViewModel =
new ViewModelProvider(this).get(HomeViewModel.class);
binding = FragmentHomeBinding.inflate(inflater, container, false);
View root = binding.getRoot();
mImage = binding.beautifulImage;
playRxJava();
return root;
}
private void playRxJava() {
// 注释 1 第一个操作符 just
Observable.just(path)
// 注释 2 第二个操作符 map 1
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Throwable {
Bitmap bitmap = null;
try {
URL url = new URL(s);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
InputStream inputStream = httpURLConnection.getInputStream();
bitmap = BitmapFactory.decodeStream(inputStream);
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
Log.d(TAG, "map one = " + Thread.currentThread());
return bitmap;
}
})
//注释 3 第三个操作符 map 2
.map(new Function<Bitmap, Bitmap>() {
@Override
public Bitmap apply(Bitmap bitmap) throws Throwable {
Bitmap btm = createWatermark(bitmap, "碧云天");
Log.d(TAG, "map two = " + Thread.currentThread());
return btm;
}
})
// 注释 4 订阅的时候切换线程,只是第一次设置有效
.subscribeOn(Schedulers.io())
// 注释 5 观察的时候切换,可以调用多次,切换之后下游观察者执行在该线程
.observeOn(AndroidSchedulers.mainThread())
// 注释 6
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Throwable {
mImage.setImageBitmap(bitmap);
Log.d(TAG, "map three = " + Thread.currentThread());
}
});
}
大概是这个模样:Observable.just().map().map().subscribeOn().observeOn().subscribe()。很简单,第一步下载图片(第一个map操作符),第二步给图片加文字(第二个map操作符),最后显示图片(subscribe参数里)。用法就不多说了,这次主要分析源码,下面来瞧一瞧。
2、源码分析
上面的使用方法线程切换的操作符也一并加上了,线程切换的部分源码待会儿下文会讲解,现在先讲事件订阅及变换流程。
这里先背一下书,上面例子中的事件流程分析会主要涉及到几个重要的类:ObservableJust、ObservableMap。主要分析1个 just操作符 和 2个 map操作符的实现。我们一个个来,先看 just() 方法的调用都干了啥:
// Observable.java
public static <T> Observable<T> just(@NonNull T item) {
Objects.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
}
我们看上面方法返回处,出现了框架中高频调用的 RxJavaPlugins.onAssembly() 这个方法,我们不用看它了,就关注参数里面的这个 new ObservableJust<>(item)对象就好。
我们看下ObservableJust这个类是个啥:
// ObservableJust.java
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
好了,上面看到了。ObservableJust继承自 被观察者基类 Observable。也就是说,我们上面基本用法处Observable.just(path)的 just 方法调用之后,返回来一个 Observable 被观察者对象。其实,看到链式调用我们不难理解,在建造者设计模式中,每次调用一个方法之后,都会将当前对象返回。而这里的建造者模式则略微不同,这里每次调用一个操作符,都会创建一个新的对象。只不过这些对象的类型全都继承自同一个基类。不信我们再往下看 调用 map 操作符都干了啥。上面基本使用处,我们点进第一个 map() 方法:
Observable.java
@NonNull
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
同样的,我们只关注方法返回值处的对象参数:new ObservableMap<>(this, mapper)。我们点开看一看 ObservableMap这个类 :
// ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
..........
可以看到,上面 ObservableMap类继承了 AbstractObservableWithUpstream类。而 AbstractObservableWithUpstream这个类最终也是继承了被观察者基类 Observable的:abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>{}。这里就不过多解释。也就是说,上面这里我们调用了 map() 操作符方法后,返回了一个新的被观察者 Observable对象,这个对象的类型是ObservableMap。
好,链式方法稍微总结一下:
just() -> 返回 ObservableJust对象(ObservableJust继承自 Observable)
map() -> 返回 ObservableMap对象(ObservableMap 继承自 Observable)
那么,上面的链式调用我们已经看了 just() 和 map()的返回值,相信下一个 map()返回了啥就不用讲了。
现在我们看最后一步订阅的方法 subscribe() 干了啥:
// Observable.java
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 注释 7 ,订阅方法链式调用
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
}
}
我们直接跳过了很多重载方法,看到了上面这个方法。别的部分不用看,就看注释 7 处 subscribeActual(observer);这个方法干了啥。好了,不要再点进去啦,再点进去就进死胡同了~~。在Observable.java基类里面,subscribeActual这个方法是一个抽象方法。所以我们要找到它的子类对象的重写方法。上一个环节调用的操作符是一个 map符。所以我们找到子类 ObservableMap.java.看看里面它重写的 subscribeActual()这个方法都干了啥:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
// 注释 8,调用上有被观察者 Observable对象的订阅方法 subscribe()
// source 是上有被观察者对象,
source.subscribe(new MapObserver<T, U>(t, function));
}
.......
看上面注释 8,恍然大悟。原来订阅方法 subscribe() 也是层层嵌套调用的,而且是从下游往上游调用。那么它要调用到哪个位置呢?不用说,肯定是第一个生成的被观察者对象处啊。上面基本用法中,第一个生成被观察者对象在哪?当然是 just()方法返回值处啊。接下来我们看看第一个被观察者对象 ObservableJust,它的最终订阅方法 subscribeActual()干了啥:
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
// 注释 9
sd.run();
}
......
上面可以看到,subscribeActual方法调用之后,从下游订阅方法传上来的观察者经过包装后,在注释 9处调用了 run() 方法。我们点进这个方法看一下:
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
// 注释 10 ,调用下游传上来的观察者 observer开始向下发送事件
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
好了,看到上面注释10的地方终于知道订阅方法自下而上调用,到了尽头要干嘛了。这是要自上而下发射事件,从而保证事件的发射顺序与操作符调用顺序一致。
下面画个草图记录一下大概的一个事件流程:

二、线程切换
下面我们来看看文章开头处基本用例的线程调度:
// 注释 11, 订阅的时候切换线程,只有第一次设置有效
.subscribeOn(Schedulers.io())
// 注释 12 , 观察的时候切换,可以调用多次,切换之后下游观察者执行在该线程
.observeOn(AndroidSchedulers.mainThread())
订阅线程切换
我们先看注释 11 处,订阅时候的线程切换。首先看一下 Schedulers.io()这是个什么小东西,返回什么:
//Schedulers.io().java
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
-----------------------------------------------------------------
@NonNull
static final Scheduler IO;
------------------------------------------------------------------
IO = RxJavaPlugins.initIoScheduler(new IOTask());
------------------------------------------------------------------
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
------------------------------------------------------------------------
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
------------------------------------------------------------------------
源码拐了好几个弯,所以上面按顺序把片段贴出来。看上面最后一截,我们知道.subscribeOn(Schedulers.io())的 Schedulers.io() 返回了一个 new IoScheduler()对象。我们再看一下 IoScheduler 这个类:
// IoScheduler.java
public final class IoScheduler extends Scheduler {
...........................
// 注释 13 ,createWorker()方法
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
.............................
上面省略了一大堆,只保留了一个 createWorker()方法。另外,我们知道了 IoScheduler 是线程调度器 Scheduler派生的一个子类。现在知道这两点就行了,先码住,下面有用。
我们再回到上面线程调度的地方:
.subscribeOn(Schedulers.io())
下面看 subscribeOn这个方法干了啥:
// Observable.java
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
老规矩,别的不看。我们看一下上面方法返回值的参数 new ObservableSubscribeOn<>(this, scheduler),点进 ObservableSubscribeOn看一下:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
// 注释 14 , scheduler.scheduleDirect(new SubscribeTask(parent))
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
又是被观察者 Observable派生的一个子类 ObservableSubscribeOn。我们先看一下 new SubscribeTask(parent)这个对象是个啥,点进去看一下:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 执行上游被观察者的订阅方法
source.subscribe(parent);
}
}
看上面,SubscribeTask 是一个 Runnable,里面的run() 方法执行了上游被观察者的订阅方法。这有何目的?我们 再往下看上面注释 14 处scheduler.scheduleDirect(new SubscribeTask(parent)) 方法参数还带一个Runnable对象要干啥:
// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 注释 15 ,createWorker()方法
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
// 注释 16, 好像 schedule在执行什么
w.schedule(task, delay, unit);
return task;
}
看注释16这个w.schedule的操作好像是在放什么大招,但一脸懵逼,上面注释 15处 createWorker()方法点进去是个抽象方法。那 w 是啥?咦?createWorker是不是很熟悉?对咯,上面有说到,还码住了。既然当前在 Scheduler基类,createWorker又是抽象方法。想当初用户又传进来 Schedulers.io() 这么一个对象(Schedulers.io == new IoScheduler() )。而 IoScheduler又是派生的子类,那 createWorker()当然要在子类 IoScheduler里找啊。好吧,再贴以下:
// IoScheduler.java
public final class IoScheduler extends Scheduler {
...........................
// 注释 13 ,createWorker()方法
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
.............................
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// 注释 17 放大招
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
好,话不多说,createWorker()方法返回一个 EventLoopWorker对象。我们看一下上面 EventLoopWorker这个类,看上面注释 17的地方好像又在放大招。我们点进threadWorker.scheduleActual()这个方法看一下:
// 注释 18 ,ScheduledExecutorService
private final ScheduledExecutorService executor;
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 注释 19
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
OK,看上面注释 18的变量和注释 19处的操作,是不是似曾相识?没错,线程池ScheduledExecutorService 。就是之前把上游被观察者对象的订阅方法的执行打包进了一个 Runnable,经过层层流转之后掉进了这个线程池里了...
观察者线程切换
// 注释 12 , 观察的时候切换,可以调用多次,切换之后下游观察者执行在该线程
.observeOn(AndroidSchedulers.mainThread())
上面注释 12处,我们还是先点进 AndroidSchedulers.mainThread()看一下这个主线程是怎么来的:
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
----------------------------------------------------------------------------
private static final Scheduler MAIN_THREAD =
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
-------------------------------------------------------------------------------
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
又是拐了几个弯。好了,看上面就行了。Looper.getMainLooper()、Handler...
主线程的 Handler被打包进线程调度器 Scheduler 里,估计又是一轮操作猛如虎的参数层层传递。我们开始点进上面注释 12处,看一下 observeOn方法把主线程的Handler传到哪,要干嘛:
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
-------------------------------------------------------------------------------
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
----------------------------------------------------------------------------------
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
void schedule() {
if (getAndIncrement() == 0) {
// 注释 20
worker.schedule(this);
}
}
又是一顿操作猛如虎,看到上面注释 12我们还是再看看 当初包装主线程 Handler、继承自Scheduler的 HandlerScheduler的内部类Worker的schedule方法干了啥把:
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
..............
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
...................
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 注释 21 ,handler
handler.sendMessageDelayed(message, unit.toMillis(delay));
.................
好了,别的不看了。看上面注释 21。主线程 handler,老一套了......
网友评论