冷饭热炒——RxJava

作者: nick_young | 来源:发表于2018-05-14 18:32 被阅读66次

已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们写烂了的RxJava。这篇文章主要分析RxJava事件的产生以及变化的原理,Ok,let's go!

0. 前言

本次源码分析使用的是RxJava2,版本2.1.14。RxJava1和RxJava2区别还是很大的,今天去github上看了下,RxJava1在三月底就停止更新了。

1. 关于RxJava

套用扔物线大神的话就是:a library for composing asynchronous and event-based programs by using observable sequences.(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。就简单说这句吧,详细还是看大神的文章吧。

2. Observable创建

在使用RxJava时,通常使用Observable.just(T t...)/Observable.create(ObservableOnSubscribe<T> source)来创建Observable对象,看其源码如下:

Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 判空
    ObjectHelper.requireNonNull(source, "source is null");
    // 通过RxJavaPlugins
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

RxJavaPlugins.java
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    // 这个与hook有关,默认为null,所以返回值即传入值
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

整个创建过程其实就是返回了一个ObservableCreate对象,该对象如下:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 代理模式的运用
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            // 调用了ObservableOnSubscribe的subscribe方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

ObservableCreate作为Observable的子类,实现了其subscribeActual方法,创建时需要传入接口ObservableOnSubscribe,该接口作为连接了观察者与被观察者。通过实现其subscribe方法,来通知观察者事件发生。
当然,我们也可以看到在subscribeActual方法中通过使用CreateEmitter作为观察者的代理类,用于控制观察这事件是否需要通知。

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        // 传入值不能为null
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        // 如果没有被处理则调用
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    ......
}

CreateEmitter作为Observer的代理类,通知观察者时需要判断传入参数是否为空以及是否被处理来判断是否调用ObserveronNext(T t)方法。

同理,我们也可以知道通过just(T... t)也是通过这种方式创建,这里不详细讲了。

3. subscribe订阅

订阅方法作为观察者和被观察者连接的桥梁,通过该方法,我们可以获得被观察者的事件发送,以及接受该事件做出的响应。

Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // hook相关略过
        observer = RxJavaPlugins.onSubscribe(this, observer);
        // 判空
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // 调用了Observable中的抽象方法subscribeActual,该方法的在ObservableCreate中实现
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ......
        throw npe;
    }
}

订阅方法更加的简单,调用了其抽象方法subscribeActual,该方法在ObservableCreate中实现。我们再看下subscribeActual代码:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 代理模式的运用
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        // 调用了ObservableOnSubscribe的subscribe方法
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

到这里整个事件发送就完全走通了:

  • 通过create方法创建Observable,传入匿名内部类ObservableOnSubscribe
  • 调用ObservableOnSubscribesubscribe方法,通过实现类中来发送数据。
  • 通过Observablesubscribe方法来将Observer订阅,在ObservableOnSubscribe调用了Observer的代理类CreateEmitter的方法来通知Observer的事件。
调用过程

emmm,用更简单的话说就是ObservableOnSubscribesubscribe方法调用了接口Observer的方法

4. map变换

前面讲了关于Observable的创建以及事件的发送,这些都只是基本操作。RxJava的强大是对事件流的各种操作,比如过滤、变化以及线程切换等。这里我们看看我们常用的map(Function<? super T, ? extends R> mapper)来分析一下吧!

Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    // 创建了ObservableMap,这里作为一个新的Observable对象返回,返回的是转换后的泛型R
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    
    // 创建过程是需要传入第一个Observable以及一个Function接口
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    // 调用subscribe时需要重写的方法
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    // 转换的Observer
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        
        // 需要将实际的Observer传入
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            ......

            U v;

            try {
                // 这里调用了Function的apply方法,将T转成U
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            // 将转换后的U发送给Observer接收
            actual.onNext(v);
        }
        ......
    }
}

OK,源码就这些,代码量很少,理解起来也很容易。这里再总结下:

  • 创建新的Observable对象ObservableMap,并将当前Observable对象传入。
  • 调用新的Observable对象的subscribe方法,在ObservableMap中调用source.subscribe(new MapObserver<T, U>(t, function))
  • MapObserver实现了Observer接口,在其onNext方法中调用了Functionapply()方法,并且最终调用传入的ObserveronNext()方法。

如下图所示:


流程

5. 线程切换

RxJava的强大之处不只是体现在各个操作符,也体现在线程的切换。RxJava默认工作在当前线程中,如果需要发送事件产生在新的线程,接收并处理事件在另一个线程怎么办?RxJava给我们提供了两个方法subscribeOn()observerOn()
subscribeOn()是指事件产生的线程,该方法只会在第一次设置有效。而observerOn()是指定事件处理的的线程,每调用一次就会切换线程。下面还是分开来说吧。

5.1 subscribeOn分析

上面说到subscribeOn作用于事件产生,并且只有第一个设置有效,我我们分析下源码:

Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    // 这里还是老套路
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    // 新的Observer,最为原始Observer的代理
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        ......
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
        ......
    }
    
    // 创建了subscribe任务,这里实现了runnable接口
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // source是上面传入的Observable对象
            source.subscribe(parent);
        }
    }
}

上面的步骤已经很熟悉了,通过该方法创建了一个新的Observable对象,最终执行subscribe()方法时调用了scheduler.scheduleDirect(new SubscribeTask(parent))SubscribeTask作为Runnable的实现类,所以我们最终会调用run()方法。其实到这里我们也就清楚为什么subscribeOn只有第一个设置有效了——不管创建多少个新的Observable对象,最终还会调用第一个Observable对象的subscribe方法,而该方法工作在该线程中!
我们接着看到底是怎么进行线程切换的(这里使用Schedulers.newThread())为例:

Schedulers.java
@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
static {
    ......
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}

static final class NewThreadHolder {
    static final Scheduler DEFAULT = new NewThreadScheduler();
}

NewThreadScheduler.java
public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

NewThreadWorker.java
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    // 线程池,可以实现循环或延迟任务。
    private final ScheduledExecutorService executor;

    volatile boolean disposed;
    
    public NewThreadWorker(ThreadFactory threadFactory) {
        // 创建方法
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // 创建一个新的Runnable,最终仍然调用传入的run方法,这里不去看了
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
    ......
}

通过上面的代码,我们可以看到最终通过线程池调用SubscribeTaskrun方法,从而达到线程切换的目的。

5.2 observerOn实现

关于observerOn的实现其实和subscribeOn很像,只不过一个是在新的线程中调用source.subscribe(parent)方法;而observerOn是在新的线程调用相应的onNext\onError等方法,所以我们没调用一次observerOn就会切换一次线程,并且下面的操作都会工作在紧挨着该操作的observerOn所指定的线程中。

6. 总结

RxJava的基本原理就是这些,RxJava使用的较多的模式就是代理模式。整个源码不难(没有包括背压以及高级用法),用心看仔细梳理就能很好理解。我所认为的理解可能就是看懂了,然后可以根据它来仿写出一个相似的RxJava,这样我觉得是真正的掌握了。RxJava, Out!

相关文章

  • 冷饭热炒——RxJava

    已经超过一个月没有写文章了,原因无非就是工作太忙。最近终于恢复以前的节奏,任务开始正常了起来。忙里偷闲,写一写人们...

  • 猫说心智教育.冷饭热炒

    接连发生的虐童事件引发公众高度关注和愤怒,其中引出一个话题:资本介入教育,是教育堕落的根源(这里本来应该是一个问号...

  • ✍🏻炒个冷饭 - 草稿

    ②2015-02一次成版的现实与境界: 一次成版的概念本身难以说个啥,但是,是有局限性的,也就是说,是有前置条件的...

  • pytorch mnist 框架

    炒个冷饭……以后不能再不会写了!!

  • 随笔丨冷饭新炒

    今天给大家分享一个我听到的新词:战略性副业。所谓战略性副业,我们拆开来看,其战略性体现在,它是脱胎于主业或与主业相...

  • 冷饭炒了又炒,还好吃吗?

    上午送女儿的时候顺便送妈妈去打麻将。 妈一上车,我递给她一瓶姜茶。她拿在手上说只有一瓶带还是不带,因为以前每次都提...

  • 吃货日常

    蛋包饭 鱿鱼饭 重庆小面 热拌面 炒河粉

  • 炒鸡蛋饭的妈妈

    我已经可以很熟练地炒好一碗鸡蛋饭。 坐锅,倒油,油热,直接打两个鸡蛋进去,用锅铲搅碎,撒上一些盐,把隔夜的冷饭放进...

  • 歌词

    又是凌晨两点半,热了热冰箱里的冷饭. 说了晚安,但灯却没有暗. 又是凌晨两点半,热了热冰箱里的冷饭. 说了晚安,但...

  • 蛋炒饭只能用冷饭炒。

    我今年25岁,生活很无趣,上下班、打打球、和朋友吃吃逛逛、睡睡觉。 今天照常下班,准备回家自己...

网友评论

    本文标题:冷饭热炒——RxJava

    本文链接:https://www.haomeiwen.com/subject/cqftdftx.html