美文网首页
RxJava 的基本使用及源码分析(订阅流程+线程切换)

RxJava 的基本使用及源码分析(订阅流程+线程切换)

作者: 碧云天EthanLee | 来源:发表于2021-07-13 22:51 被阅读0次

*概述
*基本用法及事件流程
*线程切换

概述

RxJava源码:https://github.com/ReactiveX/RxJava
RxJava出来也有好多年了,一直热度不减。网上也有大量的对这个框架的介绍:事件流、响应式编程、观察者模式......

不多说,相信每个了解过RxJava的同学都有自己的体会。这次笔记主要从RxJava的基本使用出发,从源码角度分析它的运作流程。希望能够以点带面,窥一窥这个框架的思想。

一、基本用法及事件流程

1、基本用法
直接贴用法:

public class HomeFragment extends Fragment {
    private static final String TAG = "HomeFragment";
    private static final String path = "https://ss0.baidu.com/7Po3dSag_xI4khGko9WTAnF6hhy/zhidao/pic/item/3bf33a87e950352ab1d3f8a45343fbf2b3118be8.jpg";
    private HomeViewModel homeViewModel;
    private FragmentHomeBinding binding;
    private ImageView mImage;

    public View onCreateView(@NonNull LayoutInflater inflater,
                             ViewGroup container, Bundle savedInstanceState) {
        homeViewModel =
                new ViewModelProvider(this).get(HomeViewModel.class);
        binding = FragmentHomeBinding.inflate(inflater, container, false);
        View root = binding.getRoot();
        mImage = binding.beautifulImage;
        playRxJava();
        return root;
    }
    private void playRxJava() {
         // 注释 1 第一个操作符 just
        Observable.just(path)
         // 注释 2 第二个操作符 map 1
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(String s) throws Throwable {
                        Bitmap bitmap = null;
                        try {
                            URL  url = new URL(s);
                            HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                            InputStream inputStream = httpURLConnection.getInputStream();
                            bitmap = BitmapFactory.decodeStream(inputStream);
                        } catch (MalformedURLException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        Log.d(TAG, "map one = " + Thread.currentThread());
                        return bitmap;
                    }
                })
                //注释 3 第三个操作符 map 2
                .map(new Function<Bitmap, Bitmap>() {
                    @Override
                    public Bitmap apply(Bitmap bitmap) throws Throwable {
                        Bitmap btm = createWatermark(bitmap, "碧云天");
                        Log.d(TAG, "map two = " + Thread.currentThread());
                        return btm;
                    }
                })
                // 注释 4 订阅的时候切换线程,只是第一次设置有效
                .subscribeOn(Schedulers.io())
                // 注释 5 观察的时候切换,可以调用多次,切换之后下游观察者执行在该线程
                .observeOn(AndroidSchedulers.mainThread())
                //  注释 6
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Throwable {
                        mImage.setImageBitmap(bitmap);
                        Log.d(TAG, "map three = " + Thread.currentThread());
                    }
                });
    }

大概是这个模样:Observable.just().map().map().subscribeOn().observeOn().subscribe()。很简单,第一步下载图片(第一个map操作符),第二步给图片加文字(第二个map操作符),最后显示图片(subscribe参数里)。用法就不多说了,这次主要分析源码,下面来瞧一瞧。

2、源码分析
上面的使用方法线程切换的操作符也一并加上了,线程切换的部分源码待会儿下文会讲解,现在先讲事件订阅及变换流程。

这里先背一下书,上面例子中的事件流程分析会主要涉及到几个重要的类:ObservableJust、ObservableMap。主要分析1个 just操作符 和 2个 map操作符的实现。我们一个个来,先看 just() 方法的调用都干了啥:

// Observable.java
 public static <T> Observable<T> just(@NonNull T item) {
        Objects.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<>(item));
    }

我们看上面方法返回处,出现了框架中高频调用的 RxJavaPlugins.onAssembly() 这个方法,我们不用看它了,就关注参数里面的这个 new ObservableJust<>(item)对象就好。
我们看下ObservableJust这个类是个啥:

// ObservableJust.java
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T get() {
        return value;
    }
}

好了,上面看到了。ObservableJust继承自 被观察者基类 Observable。也就是说,我们上面基本用法处Observable.just(path)的 just 方法调用之后,返回来一个 Observable 被观察者对象。其实,看到链式调用我们不难理解,在建造者设计模式中,每次调用一个方法之后,都会将当前对象返回。而这里的建造者模式则略微不同,这里每次调用一个操作符,都会创建一个新的对象。只不过这些对象的类型全都继承自同一个基类。不信我们再往下看 调用 map 操作符都干了啥。上面基本使用处,我们点进第一个 map() 方法:

Observable.java
 @NonNull
    public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

同样的,我们只关注方法返回值处的对象参数:new ObservableMap<>(this, mapper)。我们点开看一看 ObservableMap这个类 :

// ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
..........

可以看到,上面 ObservableMap类继承了 AbstractObservableWithUpstream类。而 AbstractObservableWithUpstream这个类最终也是继承了被观察者基类 Observable的:abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>{}。这里就不过多解释。也就是说,上面这里我们调用了 map() 操作符方法后,返回了一个新的被观察者 Observable对象,这个对象的类型是ObservableMap。
好,链式方法稍微总结一下:
just() -> 返回 ObservableJust对象(ObservableJust继承自 Observable)
map() -> 返回 ObservableMap对象(ObservableMap 继承自 Observable)
那么,上面的链式调用我们已经看了 just() 和 map()的返回值,相信下一个 map()返回了啥就不用讲了。
现在我们看最后一步订阅的方法 subscribe() 干了啥:

// Observable.java
 public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            // 注释 7 ,订阅方法链式调用
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
        }
    }

我们直接跳过了很多重载方法,看到了上面这个方法。别的部分不用看,就看注释 7 处 subscribeActual(observer);这个方法干了啥。好了,不要再点进去啦,再点进去就进死胡同了~~。在Observable.java基类里面,subscribeActual这个方法是一个抽象方法。所以我们要找到它的子类对象的重写方法。上一个环节调用的操作符是一个 map符。所以我们找到子类 ObservableMap.java.看看里面它重写的 subscribeActual()这个方法都干了啥:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        // 注释 8,调用上有被观察者 Observable对象的订阅方法 subscribe()
       // source 是上有被观察者对象,
        source.subscribe(new MapObserver<T, U>(t, function));
    }
.......

看上面注释 8,恍然大悟。原来订阅方法 subscribe() 也是层层嵌套调用的,而且是从下游往上游调用。那么它要调用到哪个位置呢?不用说,肯定是第一个生成的被观察者对象处啊。上面基本用法中,第一个生成被观察者对象在哪?当然是 just()方法返回值处啊。接下来我们看看第一个被观察者对象 ObservableJust,它的最终订阅方法 subscribeActual()干了啥:

public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        // 注释 9
        sd.run();
    }
......

上面可以看到,subscribeActual方法调用之后,从下游订阅方法传上来的观察者经过包装后,在注释 9处调用了 run() 方法。我们点进这个方法看一下:

 @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                // 注释 10 ,调用下游传上来的观察者 observer开始向下发送事件
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

好了,看到上面注释10的地方终于知道订阅方法自下而上调用,到了尽头要干嘛了。这是要自上而下发射事件,从而保证事件的发射顺序与操作符调用顺序一致。
下面画个草图记录一下大概的一个事件流程:


RxJava.png
二、线程切换

下面我们来看看文章开头处基本用例的线程调度:

// 注释 11, 订阅的时候切换线程,只有第一次设置有效
.subscribeOn(Schedulers.io())
 // 注释 12 , 观察的时候切换,可以调用多次,切换之后下游观察者执行在该线程
.observeOn(AndroidSchedulers.mainThread())

订阅线程切换
我们先看注释 11 处,订阅时候的线程切换。首先看一下 Schedulers.io()这是个什么小东西,返回什么:

//Schedulers.io().java
 @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
-----------------------------------------------------------------
@NonNull
    static final Scheduler IO;
------------------------------------------------------------------
 IO = RxJavaPlugins.initIoScheduler(new IOTask());
------------------------------------------------------------------
 static final class IOTask implements Supplier<Scheduler> {
        @Override
        public Scheduler get() {
            return IoHolder.DEFAULT;
        }
    }
------------------------------------------------------------------------
static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
------------------------------------------------------------------------

源码拐了好几个弯,所以上面按顺序把片段贴出来。看上面最后一截,我们知道.subscribeOn(Schedulers.io())的 Schedulers.io() 返回了一个 new IoScheduler()对象。我们再看一下 IoScheduler 这个类:

// IoScheduler.java
public final class IoScheduler extends Scheduler {
...........................
//  注释 13 ,createWorker()方法
 @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
.............................

上面省略了一大堆,只保留了一个 createWorker()方法。另外,我们知道了 IoScheduler 是线程调度器 Scheduler派生的一个子类。现在知道这两点就行了,先码住,下面有用。
我们再回到上面线程调度的地方:

.subscribeOn(Schedulers.io())

下面看 subscribeOn这个方法干了啥:

// Observable.java
  public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

老规矩,别的不看。我们看一下上面方法返回值的参数 new ObservableSubscribeOn<>(this, scheduler),点进 ObservableSubscribeOn看一下:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);
       // 注释 14 , scheduler.scheduleDirect(new SubscribeTask(parent))
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

又是被观察者 Observable派生的一个子类 ObservableSubscribeOn。我们先看一下 new SubscribeTask(parent)这个对象是个啥,点进去看一下:

 final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
           // 执行上游被观察者的订阅方法
            source.subscribe(parent);
        }
    }

看上面,SubscribeTask 是一个 Runnable,里面的run() 方法执行了上游被观察者的订阅方法。这有何目的?我们 再往下看上面注释 14 处scheduler.scheduleDirect(new SubscribeTask(parent)) 方法参数还带一个Runnable对象要干啥:

 // Scheduler.java
@NonNull
   public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
       // 注释  15 ,createWorker()方法
       final Worker w = createWorker();

       final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

       DisposeTask task = new DisposeTask(decoratedRun, w);
      // 注释 16, 好像 schedule在执行什么
       w.schedule(task, delay, unit);

       return task;
   }

看注释16这个w.schedule的操作好像是在放什么大招,但一脸懵逼,上面注释 15处 createWorker()方法点进去是个抽象方法。那 w 是啥?咦?createWorker是不是很熟悉?对咯,上面有说到,还码住了。既然当前在 Scheduler基类,createWorker又是抽象方法。想当初用户又传进来 Schedulers.io() 这么一个对象(Schedulers.io == new IoScheduler() )。而 IoScheduler又是派生的子类,那 createWorker()当然要在子类 IoScheduler里找啊。好吧,再贴以下:

// IoScheduler.java
public final class IoScheduler extends Scheduler {
...........................
//  注释 13 ,createWorker()方法
 @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
.............................
static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;
        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            // 注释 17 放大招
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

好,话不多说,createWorker()方法返回一个 EventLoopWorker对象。我们看一下上面 EventLoopWorker这个类,看上面注释 17的地方好像又在放大招。我们点进threadWorker.scheduleActual()这个方法看一下:

// 注释 18 ,ScheduledExecutorService 
private final ScheduledExecutorService executor;
 @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
        Future<?> f;
        try {
            if (delayTime <= 0) {
                // 注释 19
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }

OK,看上面注释 18的变量和注释 19处的操作,是不是似曾相识?没错,线程池ScheduledExecutorService 。就是之前把上游被观察者对象的订阅方法的执行打包进了一个 Runnable,经过层层流转之后掉进了这个线程池里了...

观察者线程切换

 // 注释 12 , 观察的时候切换,可以调用多次,切换之后下游观察者执行在该线程
.observeOn(AndroidSchedulers.mainThread())

上面注释 12处,我们还是先点进 AndroidSchedulers.mainThread()看一下这个主线程是怎么来的:

public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
----------------------------------------------------------------------------
private static final Scheduler MAIN_THREAD =
        RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
-------------------------------------------------------------------------------
private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }

又是拐了几个弯。好了,看上面就行了。Looper.getMainLooper()、Handler...
主线程的 Handler被打包进线程调度器 Scheduler 里,估计又是一轮操作猛如虎的参数层层传递。我们开始点进上面注释 12处,看一下 observeOn方法把主线程的Handler传到哪,要干嘛:

  public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
-------------------------------------------------------------------------------
 public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }
----------------------------------------------------------------------------------
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
  void schedule() {
            if (getAndIncrement() == 0) {
               //  注释 20 
                worker.schedule(this);
            }
        }

又是一顿操作猛如虎,看到上面注释 12我们还是再看看 当初包装主线程 Handler、继承自Scheduler的 HandlerScheduler的内部类Worker的schedule方法干了啥把:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;
   ..............
    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;
        ...................
        @Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            // 注释 21 ,handler
            handler.sendMessageDelayed(message, unit.toMillis(delay));
.................

好了,别的不看了。看上面注释 21。主线程 handler,老一套了......

相关文章

网友评论

      本文标题:RxJava 的基本使用及源码分析(订阅流程+线程切换)

      本文链接:https://www.haomeiwen.com/subject/qnuspltx.html