RxJava 历史有点悠久,目前最新版是 2.x 的版本,网络上有很多关于 RxJava 的文章, 随便搜搜一大堆。为什么还要来写一些文章,毕竟那是别人的东西,并没有变成我的知识,其次课程具体的内容有安排,所以我们还是自己动手写写吧。还是老套路从源码的角度出发,当然 RxJava 用了这么久,我们应该也有自己的一些理解,其实就是三个字:事件流
很多人一开始就从观察者设计模式入手去分析,这个也不说行不通也蛮好的,这里我用事件流的方式来讲解一些。到底什么是事件流?你可以想象一条河流最终将涌入大海,那么中间会经过湖泊山川,合并、分流、等等,这就是一个流,中间会经历很多,但最终会流入大海,整个过程是一条连起来的线。
根据 android 的应用来分析,我们打开 app 最终又会退出 app,那么整个 app 的应用我们都可以看成是一个大的事件流,里面像 Click 点击事件,权限申请,线程切换,网络访问,第三方分享登录等等,也都可以看成是事件流。所以在 RxJava 之后就接连着有很多像 RxAndroid 、RxBus 、RxPermission 这些好用的一些第三方库,以后肯定还会有很多基于事件流的第三方库。那按照你这么说,是不是所有的代码都可以基于事件流去写?按理解是这个样子的,比如说第三方分享和第三方登录等等。接下来我通过一个非常简单的小事例来讲解一下,比如下载图片加水印显示到 ImageView 控件上。
当然这些是我个人的理解,你应该也有自己的理解。这也是我为什么呼吁大家自己去写文章的原因,实在拿不出手就设为私密的嘛,但终归自己总结吸收了一下。就拿设计模式来说,你就不再是背了,也不会拿模式去套,收货肯定是有的。当然我们写好一篇文章得要花一天左右的时间,的确需要这么久,不信你可以试试,就看你是不是很闲了。
1.通俗写法
下载图片加水印显示到 ImageView 控件上,我们之前写代码应该是,开个线程去下载图片 Bitmap,给 Bitmap 加上水印,通过 Handler 切换到主线程调用 ImageView 的 setImageBitmap() 方法我们来看下具体代码:
public class MainActivity extends AppCompatActivity {
private ImageView mImageView;
private Handler mHandler = new Handler() {
@Override
public void handleMessage(Message msg) {
Bitmap bitmap = (Bitmap) msg.obj;
mImageView.setImageBitmap(bitmap);
}
};
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mImageView = (ImageView) findViewById(R.id.image_view);
new Thread(new Runnable() {
@Override
public void run() {
try {
URL url = new URL("http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.connect();
InputStream inputStream = connection.getInputStream();
// 通过流解析到 Bitmap
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
inputStream.close();
// 给图片 Bitmap 加水印
bitmap = BitmapUtils.drawText2Bitmap(bitmap,"RxJava");
// 通过 Handler 发送消息切换到主线程
Message message = Message.obtain();
message.obj = bitmap;
mHandler.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
2.事件流写法
上面代码运行起来了,一看没毛病挺好的堪称完美,也不觉得麻烦(作)。接下来我们看下基于事件流的写法,当然这里我们使用 RxJava,先来画个流向图:
事件流向
Observable.just("http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg")
.map(new Function<String, Bitmap>() { // 下载网络图片
@Override
public Bitmap apply(@NonNull String imagePath) throws Exception {
URL url = new URL(imagePath);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.connect();
InputStream inputStream = connection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
inputStream.close();
return bitmap;
}
})
.map(new Function<Bitmap, Bitmap>() {// 给图片加水印
@Override
public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
bitmap = BitmapUtils.drawText2Bitmap(bitmap,"RxJava");
return bitmap;
}
})
.subscribeOn(Schedulers.io())// 上面之前的执行在子线程中(线程的调度)
.observeOn(AndroidSchedulers.mainThread())// 下面之后的执行在主线程中(线程的调度)
.subscribe(new Consumer<Bitmap>() {// 显示图片
@Override
public void accept(Bitmap bitmap) throws Exception {
imageView.setImageBitmap(bitmap);
}
});
我记得第一次采用 RxJava 的时候还是很不习惯的,渐渐用多了就好了,关键还是多用多熟悉 RxJava 的 API 和事件流的思想就好了,上面这样写就简单很多,代码阅读起来也是非常清晰的。接下来我们自己动手来写一个事件流的库,不是为了重复造轮子,而是为了更好了解这种编程思想,在这之前我们得先把 RxJava 的源码走一遍。
3. RxJava 的前奏
String imageUrl = "http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg";
Observable.just(imageUrl)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("TAG","onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e("TAG","s = "+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("TAG","onError");
}
@Override
public void onComplete() {
Log.e("TAG","onComplete");
}
});
上面这段代码只是一个小事例,代码的本身不具备任何意义,我们先来看下 Observable 这是我们之前所讲的观察者设计模式中的被观察对象,Observer 是观察者对象,不知道之前讲的我们是否还有印象。只不过这个有点特别,特别在哪里?我们之前都是首先去订阅注册,当被观察者发生改变时去通知观察者发生改变,但这里是我们只要一订阅注册就通知观察者发生改变,可以理解为观察者设计模式的变异版本。客观的角度也说明了我们不要去套和记某一种设计模式。看下具体的源码:
// 返回的是这个 ObservableJust
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
// 主要看 subscribeActual 方法
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
// value 就是值 ,说具体一点就是上面的 imageUrl
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
// 调用 Observer 的 onSubscribe 方法
s.onSubscribe(sd);
// 调用 sd 的 run 方法
sd.run();
}
@Override
public T call() {
return value;
}
}
// 主要看 run 方法
public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {
// ...... 省略一些简单代码
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
// 调用 observer 的 onNext 方法
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
// 调用 observer 的 onComplete 方法
observer.onComplete();
}
}
}
}
4. RxJava 的线程调度
Rxjava 到底应该怎么做线程调度切换?我们其实可以猜一下,料想它也不可能写出花来。我们最上面的第一种写法是采用 线程 + Handler,那么我想 RxJava 肯定也是封装的 线程 + Handler 。而对于线程池和Handler源码不是特别熟悉的,文章看到这里应该可以停一下了,我们得去学习了解一下基础。
.subscribeOn(Schedulers.io())// 上面之前的执行在子线程中
.observeOn(AndroidSchedulers.mainThread())// 下面之后的执行在主线程中
总共就两行代码,前几年作为小白的我刚开始用的时候,第一感觉就是真的好神奇啊!
// 创建了一个 ObservableSubscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
// 主要看 subscribeActual 方法
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 创建了一个 SubscribeOnObserver ,也就是把 SubscribeOnObserver 进行了一层包装
// 这里其实就是之前所讲的代理设计模式
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 调用代理的 Observer 的 onSubscribe 方法
s.onSubscribe(parent);
// 把下面这个代码变为两行,容易看懂一点
// parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
Disposable disposable = scheduler.scheduleDirect(new SubscribeTask(parent));
parent.setDisposable(disposable);
}
}
// 看到这里差不多要明白了 implements Runnable 看样子要开线程的节奏
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
// Schedulers.io() 是它
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
// 单例设计模式 - 静态内部类
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
// 线程池 + 线程 + Runnable
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 创建 线程池
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 代理
DisposeTask task = new DisposeTask(decoratedRun, w);
// 利用线程池去执行任务
w.schedule(task, delay, unit);
return task;
}
// 在子线程中执行 source.subscribe(parent); // 又要往上走,这是子线程处理的逻辑
source.subscribe(parent); 执行在子线程中,source 是啥这个不用说了,然后开始往前走,所以这就是子线程的处理部分,其实挺简单的。接下来看下主线程的切换:
// 创建了一个 ObservableObserveOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
// 主要还是看 subscribeActual 方法
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
// ...... 省略部分代码
// 创建一个 Scheduler.Worker
Scheduler.Worker w = scheduler.createWorker();
// ObserveOnObserver
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// 最后的 onNext 是 schedule()
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
}
// MAIN_THREAD 的 Scheduler
public final class AndroidSchedulers {
private static final class MainHolder {
// new Handler(Looper.getMainLooper()) 创建一个主线程的 Handler 对象
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
}
// Handler 切换到主线程
private static final class HandlerWorker extends Worker {
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 但是 handler 并没有复写 handleMessage 方法,那是怎么调用了方法?一切都在 Handler 源码中
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
}
}
至于像 map 、flatMap、delay、filter 等等我们都可以去看一下源码,但是注意别太深究细节,因为很多地方涉及到数据结构和算法,有时候看一些细节代码的确比较头疼。后面我们还是自己动手写一下,加深一下事件流(响应式)编程思想。
所有分享大纲:Android进阶之旅 - 系统架构篇
网友评论