美文网首页Android开发经验谈Android技术知识Android开发
第三方开源库 RxJava - 基本使用和源码分析

第三方开源库 RxJava - 基本使用和源码分析

作者: 你也不知道 | 来源:发表于2020-07-01 22:00 被阅读0次

    RxJava 历史有点悠久,目前最新版是 2.x 的版本,网络上有很多关于 RxJava 的文章, 随便搜搜一大堆。为什么还要来写一些文章,毕竟那是别人的东西,并没有变成我的知识,其次课程具体的内容有安排,所以我们还是自己动手写写吧。还是老套路从源码的角度出发,当然 RxJava 用了这么久,我们应该也有自己的一些理解,其实就是三个字:事件流

    很多人一开始就从观察者设计模式入手去分析,这个也不说行不通也蛮好的,这里我用事件流的方式来讲解一些。到底什么是事件流?你可以想象一条河流最终将涌入大海,那么中间会经过湖泊山川,合并、分流、等等,这就是一个流,中间会经历很多,但最终会流入大海,整个过程是一条连起来的线。

    根据 android 的应用来分析,我们打开 app 最终又会退出 app,那么整个 app 的应用我们都可以看成是一个大的事件流,里面像 Click 点击事件,权限申请,线程切换,网络访问,第三方分享登录等等,也都可以看成是事件流。所以在 RxJava 之后就接连着有很多像 RxAndroid 、RxBus 、RxPermission 这些好用的一些第三方库,以后肯定还会有很多基于事件流的第三方库。那按照你这么说,是不是所有的代码都可以基于事件流去写?按理解是这个样子的,比如说第三方分享和第三方登录等等。接下来我通过一个非常简单的小事例来讲解一下,比如下载图片加水印显示到 ImageView 控件上。

    当然这些是我个人的理解,你应该也有自己的理解。这也是我为什么呼吁大家自己去写文章的原因,实在拿不出手就设为私密的嘛,但终归自己总结吸收了一下。就拿设计模式来说,你就不再是背了,也不会拿模式去套,收货肯定是有的。当然我们写好一篇文章得要花一天左右的时间,的确需要这么久,不信你可以试试,就看你是不是很闲了。

    1.通俗写法


    下载图片加水印显示到 ImageView 控件上,我们之前写代码应该是,开个线程去下载图片 Bitmap,给 Bitmap 加上水印,通过 Handler 切换到主线程调用 ImageView 的 setImageBitmap() 方法我们来看下具体代码:

    public class MainActivity extends AppCompatActivity {
    
        private ImageView mImageView;
        private Handler mHandler = new Handler() {
            @Override
            public void handleMessage(Message msg) {
                Bitmap bitmap = (Bitmap) msg.obj;
                mImageView.setImageBitmap(bitmap);
            }
        };
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            mImageView = (ImageView) findViewById(R.id.image_view);
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        URL url = new URL("http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg");
                        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                        connection.connect();
                        InputStream inputStream = connection.getInputStream();
                        // 通过流解析到 Bitmap
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        inputStream.close();
                        // 给图片 Bitmap 加水印
                        bitmap = BitmapUtils.drawText2Bitmap(bitmap,"RxJava");
                        // 通过 Handler 发送消息切换到主线程
                        Message message = Message.obtain();
                        message.obj = bitmap;
                        mHandler.sendMessage(message);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
    
                }
            }).start(); 
        }
    }
    

    2.事件流写法


    上面代码运行起来了,一看没毛病挺好的堪称完美,也不觉得麻烦(作)。接下来我们看下基于事件流的写法,当然这里我们使用 RxJava,先来画个流向图:


    事件流向
            Observable.just("http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg")
                    .map(new Function<String, Bitmap>() { // 下载网络图片
                        @Override
                        public Bitmap apply(@NonNull String imagePath) throws Exception {
                            URL url = new URL(imagePath);
                            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                            connection.connect();
                            InputStream inputStream = connection.getInputStream();
                            Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                            inputStream.close();
                            return bitmap;
                        }
                    })
                    .map(new Function<Bitmap, Bitmap>() {// 给图片加水印
                        @Override
                        public Bitmap apply(@NonNull Bitmap bitmap) throws Exception {
                            bitmap = BitmapUtils.drawText2Bitmap(bitmap,"RxJava");
                            return bitmap;
                        }
                    })
                    .subscribeOn(Schedulers.io())// 上面之前的执行在子线程中(线程的调度)
                    .observeOn(AndroidSchedulers.mainThread())// 下面之后的执行在主线程中(线程的调度)
                    .subscribe(new Consumer<Bitmap>() {// 显示图片
                        @Override
                        public void accept(Bitmap bitmap) throws Exception {
                            imageView.setImageBitmap(bitmap);
                        }
                    });
    

    我记得第一次采用 RxJava 的时候还是很不习惯的,渐渐用多了就好了,关键还是多用多熟悉 RxJava 的 API 和事件流的思想就好了,上面这样写就简单很多,代码阅读起来也是非常清晰的。接下来我们自己动手来写一个事件流的库,不是为了重复造轮子,而是为了更好了解这种编程思想,在这之前我们得先把 RxJava 的源码走一遍。

    3. RxJava 的前奏

            String imageUrl = "http://img5.imgtn.bdimg.com/it/u=263117812,2521637551&fm=27&gp=0.jpg";
    
            Observable.just(imageUrl)
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.e("TAG","onSubscribe");
                        }
    
                        @Override
                        public void onNext(@NonNull String s) {
                            Log.e("TAG","s = "+s);
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.e("TAG","onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("TAG","onComplete");
                        }
                    });
    

    上面这段代码只是一个小事例,代码的本身不具备任何意义,我们先来看下 Observable 这是我们之前所讲的观察者设计模式中的被观察对象,Observer 是观察者对象,不知道之前讲的我们是否还有印象。只不过这个有点特别,特别在哪里?我们之前都是首先去订阅注册,当被观察者发生改变时去通知观察者发生改变,但这里是我们只要一订阅注册就通知观察者发生改变,可以理解为观察者设计模式的变异版本。客观的角度也说明了我们不要去套和记某一种设计模式。看下具体的源码:

    // 返回的是这个 ObservableJust 
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    
    // 主要看 subscribeActual 方法
    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
        // value 就是值 ,说具体一点就是上面的 imageUrl 
        private final T value;
        public ObservableJust(final T value) {
            this.value = value;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> s) {
            ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
            // 调用 Observer 的 onSubscribe 方法
            s.onSubscribe(sd);
            // 调用 sd 的 run 方法
            sd.run();
        }
    
        @Override
        public T call() {
            return value;
        }
    }
    
    // 主要看 run 方法
    public static final class ScalarDisposable<T>
        extends AtomicInteger
        implements QueueDisposable<T>, Runnable {
             // ...... 省略一些简单代码
            @Override
            public void run() {
                if (get() == START && compareAndSet(START, ON_NEXT)) {
                    // 调用 observer 的 onNext 方法
                    observer.onNext(value);
                    if (get() == ON_NEXT) {
                        lazySet(ON_COMPLETE);
                        // 调用 observer 的 onComplete 方法
                        observer.onComplete();
                    }
                }
            }
        }
    

    4. RxJava 的线程调度

    Rxjava 到底应该怎么做线程调度切换?我们其实可以猜一下,料想它也不可能写出花来。我们最上面的第一种写法是采用 线程 + Handler,那么我想 RxJava 肯定也是封装的 线程 + Handler 。而对于线程池和Handler源码不是特别熟悉的,文章看到这里应该可以停一下了,我们得去学习了解一下基础。

    .subscribeOn(Schedulers.io())// 上面之前的执行在子线程中
    .observeOn(AndroidSchedulers.mainThread())// 下面之后的执行在主线程中
    

    总共就两行代码,前几年作为小白的我刚开始用的时候,第一感觉就是真的好神奇啊!

    // 创建了一个 ObservableSubscribeOn 
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
    // 主要看 subscribeActual 方法
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            // 创建了一个 SubscribeOnObserver ,也就是把 SubscribeOnObserver 进行了一层包装
            // 这里其实就是之前所讲的代理设计模式
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            // 调用代理的 Observer 的 onSubscribe 方法
            s.onSubscribe(parent);
            // 把下面这个代码变为两行,容易看懂一点
            // parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
            Disposable disposable = scheduler.scheduleDirect(new SubscribeTask(parent));
            parent.setDisposable(disposable);
        }
    }
    // 看到这里差不多要明白了 implements Runnable 看样子要开线程的节奏
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    // Schedulers.io() 是它
    static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
    }
    // 单例设计模式 - 静态内部类
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    // 线程池 + 线程 + Runnable
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // 创建 线程池
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // 代理
        DisposeTask task = new DisposeTask(decoratedRun, w);
        // 利用线程池去执行任务
        w.schedule(task, delay, unit);
    
        return task;
    }
    // 在子线程中执行 source.subscribe(parent); // 又要往上走,这是子线程处理的逻辑
    

    source.subscribe(parent); 执行在子线程中,source 是啥这个不用说了,然后开始往前走,所以这就是子线程的处理部分,其实挺简单的。接下来看下主线程的切换:

    // 创建了一个 ObservableObserveOn 
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    
    // 主要还是看 subscribeActual 方法
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
           // ...... 省略部分代码
           // 创建一个 Scheduler.Worker 
           Scheduler.Worker w = scheduler.createWorker();
           // ObserveOnObserver 
           source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // 最后的 onNext 是 schedule() 
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    }
    // MAIN_THREAD 的 Scheduler 
    public final class AndroidSchedulers {
        private static final class MainHolder {
            // new Handler(Looper.getMainLooper()) 创建一个主线程的 Handler 对象
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
        
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override 
                public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
        });
    }
    // Handler 切换到主线程
    private static final class HandlerWorker extends Worker {
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            // 但是 handler 并没有复写 handleMessage 方法,那是怎么调用了方法?一切都在 Handler 源码中
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
        }
    }
    

    至于像 map 、flatMap、delay、filter 等等我们都可以去看一下源码,但是注意别太深究细节,因为很多地方涉及到数据结构和算法,有时候看一些细节代码的确比较头疼。后面我们还是自己动手写一下,加深一下事件流(响应式)编程思想。

    所有分享大纲:Android进阶之旅 - 系统架构篇

    视频讲解地址:https://pan.baidu.com/s/1jIf7SqU

    相关文章

      网友评论

        本文标题:第三方开源库 RxJava - 基本使用和源码分析

        本文链接:https://www.haomeiwen.com/subject/pekkqktx.html