这次扒一扒rxjava的源码,在此声明一下我写博客的目的不是为了什么给新人教导,完全是自己做做笔记而已,当然为了让自己的笔记更有意义和便于自己复习,也会尽量写的通俗易懂一些,期间也会到处借鉴别人写的好的一些博客,不是为了功利性目的,如果万幸你看到这篇文章,还请多多包容。
此次用到的版本是2.1.7,导入依赖:
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
想想自己刚接触到rxjava的时候也是很懵逼的,因为没有事先了解到这个框架的核心思想,所以总带着自己以前的思维去看待,结果学起来就觉得很费劲,后来看到darren大佬的视频,讲到的响应式编程思想,瞬间就悟了。
其实编程最核心也最有魅力的地方,个人觉得不是去实现那些复杂的业务,而是发现世界万物的本质,提取其核心思想和提炼各个流程的关键步骤,然后用代码去实现他们封装他们表达他们。比如面向对象编程,光一句--万物皆是对象,其实就是一个非常具有哲理的话,而把这个思想具化成编程语言的主框架,这就是面向对象编程的牛逼之处。
个人是这样理解rxjava的核心思想的:
但凡做一件稍微复杂点的事情,日常生活中我们都是会把它分为几个步骤去完成,从第二个步骤开始之后的每一个步骤都是基于上一个步骤的结果来,通过层层加工作业最终做完了这件事情,最符合这种特征的事件就是工厂流水线作业了,假设要组装一个字符串abc,如果用rxjava来组装是这么写:
Observable.just("a")
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s + "b";
}
})
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s + "c";
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
@Override
public void onNext(@NonNull String str) {
Log.d("TAG", str);
}
@Override
public void onError(@NonNull Throwable e) {}
@Override
public void onComplete() {}
});
打印日志:
2021-07-06 14:15:03.564 30737-30737/com.trendlab.rxjavaex D/TAG: abc
可以看到从开始传入的a,经过2个map的分别加工,最终到最后一个步骤变成了abc,你可能觉得费这么老大劲实现这种功能实在没什么可夸奖的,这样的写法又有什么优越性呢?
这个例子我只是想说明rxjava的结构性是如流水线一样的工作原理,那我再举一个例子,涉及到异步线程和线程切换的事件组合。
比如从网上下载一张图片并放到我们的imageView上显示,如果不用rxjava你应该是这么写的:开一个线程下载图片,下载完后用handler切换到主线程给imageView设置一下图片,对吧?这样写不是不行,但是写起来非常麻烦。
开发者要自己管理线程和手动做线程切换调度,这些都不是业务代码,开发者在工作中应该尽量去专注业务而少去造轮子,而rxjava就是这种轮子,它已经帮我们把这些轮子帮我们造好了,看看它是怎么写的:
Observable.just("https://img1.sycdn.imooc.com/5b8ab353000181b204000284.jpg")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
//1.下载图片
URL url = new URL(s);
URLConnection urlConnection = url.openConnection();
InputStream inputStream = urlConnection.getInputStream();
return BitmapFactory.decodeStream(inputStream);
}
})
.subscribeOn(Schedulers.io())//io线程下载
.observeOn(AndroidSchedulers.mainThread())//android主线程接收图片bitmap
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
//2.设置图片
ivImage.setImageBitmap(bitmap);
}
});
传入图片地址,下载图片,指定下载的线程,指定更新UI的线程,设置图片,简单吧?清晰明朗吧?一条线下来,一步一步走,这就是rxjava的核心思想,流式事件响应。
下面来研究它是怎么做到的。
先不管那些复杂的变换和线程调度,先从简单的一个观察者和被观察者的案例来看看,所谓的流式响应是怎样发生的:
Observable.just("hello rxjava")//just方法构建了一个observable可观察对象
//new了一个Observer观察者
//subscribe()方法,观察者订阅了一个可观察对象
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d("TAG", "onSubscribe");
}
@Override
public void onNext(@NonNull String str) {
Log.d("TAG", str);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("TAG", "onError");
}
@Override
public void onComplete() {
Log.d("TAG", "onComplete");
}
});
这里不能以以前的观察者和被观察者逻辑去理解,rxjava的思想是响应式编程,所以它要做的事情都是即时的,而且它的订阅者只有一个,并且在调用subscribe()方法的时候被观察者就会开始行动执行数据流转了。
先来看看just()方法,它的任务就是创建了一个Observable,可观察对象:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
//这个方法可理解成直接返回了入参source,因为我们没有给onObservableAssembly赋值
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
//看看这个ObservableJust类
// 继承自Observable,实现了ScalarCallable接口
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
//这个value是我们传的那个字符串
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
//这个方法很重要,到时候会用到
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
由此我们可以知道just()方法的主要工作就是new了一个ObservableJust对象,这个对象是Observable的子类,实际上Observable有很多子类,这里我们只分析其中一种,其他的原理类似。
再来看看订阅者,这里直接new了一个匿名类Observer,看看Observer的代码:
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
很简单的一个接口,看来关键代码就是那个subscribe()了:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//把observer作为入参,类似onAssembly方法,其实就是返回了入参observer
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空操作
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//熟悉的方法,这个方法就是之前提到要注意的的observable里的方法
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
所以这里的关键代码其实就是调用了observable的subscribeActual()方法,我们再来回顾一下这个方法做了哪些动作,从而调动了观察者接收被观察者的行为:
@Override
protected void subscribeActual(Observer<? super T> s) {
//new了一个ScalarDisposable对象,注意入参是观察者和被观察者的value
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
//首先调用了Observer的onSubscribe()方法,
s.onSubscribe(sd);
//sd的run() 方法
sd.run();
}
看看ScalarDisposable类:
/**
* Represents a Disposable that signals one onNext followed by an onComplete.
*
* @param <T> the value type
*/
public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {
private static final long serialVersionUID = 3880992722410194083L;
final Observer<? super T> observer;
final T value;
static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;
//注意入参是观察者和被观察者的value
public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}
@Override
public boolean offer(T value) {
throw new UnsupportedOperationException("Should not be called!");
}
@Override
public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called!");
}
@Nullable
@Override
public T poll() throws Exception {
if (get() == FUSED) {
lazySet(ON_COMPLETE);
return value;
}
return null;
}
@Override
public boolean isEmpty() {
return get() != FUSED;
}
@Override
public void clear() {
lazySet(ON_COMPLETE);
}
@Override
public void dispose() {
set(ON_COMPLETE);
}
@Override
public boolean isDisposed() {
return get() == ON_COMPLETE;
}
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
lazySet(FUSED);
return SYNC;
}
return NONE;
}
//run()方法被调用
@Override
public void run() {
//判断一下当前状态,如果是START 则将其设置为ON_NEXT
if (get() == START && compareAndSet(START, ON_NEXT)) {
//执行了observer的onNext()方法
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
//执行了observer的onComplete()方法
observer.onComplete();
}
}
}
}
由此可知,一旦订阅了,则被订阅者就会开始执行任务把结果传给被订阅者,而且会通知被订阅者执行相关的方法,这就是它基本的工作流程,下篇我们看看事件变换map()是怎么包装一层层转换事件的。
网友评论