初识Rxjava

作者: 翻身不做咸鱼 | 来源:发表于2018-04-25 00:41 被阅读199次

星球话题:用过RxJava和RxAndroid吗?RxAndroid切换线程是怎么实现的呢?

去年知乎上参加了玉刚的Live,听大神讲解职业规划。随后入了微信群,去年11月份也加入主席的星球。由于去年十一月份刚好接了外包工作,比较忙就忽略星球的任务,说来惭愧,到现在还没有交过一次作业。再加上今年年初想换工作,就忙于复习,星球的作业就落下,希望从今天开始,把作业补回来。

年初也去试水,发觉现在android的要求真的是高。可能也是自己比较菜吧,试了三家没有拿到offer。今年计划,好好复习安卓知识,学点RN、小程序、PWA,争取拿到好的offer。女朋友说我,晚上想了千万条路,隔天起来走原路。哎,反正还是得脚踏实地,一步一步学习。说干就干,下边我们开始学习。

(1)RxJava 基本概念

1、Observable (可观察者,即被观察者)

2、Observer (观察者)

3、subscribe (订阅)、事件

4、Scheduler 调度器,相当于线程控制器

Rxjava 实现
1、创建Observable(被观察者):

     mObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("2018年");
                subscriber.onNext("CBA");
                subscriber.onNext("辽宁队");
                subscriber.onNext("夺冠");
                subscriber.onCompleted();
            }
        });

这里传入了一个OnSubscribe对象作为参数。OnSubscribe会被存储在返回的 Observable对 象中,它的作用相当于一个计划表,当Observable被订阅的时候,OnSubscribe的call()方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用四次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递。

2、创建Observer(观察者):

    mObserver = new Observer<String>() {
            @Override
            public void onCompleted() {
                LogUtil.d("onCompleted:");
            }
            @Override
            public void onError(Throwable e) {
                LogUtil.d("onError:"+e);
            }

            @Override
            public void onNext(String s) {
                LogUtil.d("onNext:"+s);
            }

        };

Subscriber是实现Observer的抽象类,用法也一样:

    mSubscriber = new Subscriber() {
            @Override
            public void onCompleted() {
                LogUtil.d("onCompleted:");
            }

            @Override
            public void onError(Throwable e) {
                LogUtil.d("onError:"+e);
            }

            @Override
            public void onNext(Object o) {
                LogUtil.d("onNext:"+o);
            }
        };

Subscirber与Observe 的区别是:
1、onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之 前被调用,可以用于做一些准备工作。
2、unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。

3、subscribe()订阅:

 mObservable.subscribe(mObserver);
或者
mObservable.subscribe(mSubscriber);

以上1、2、3过程也可以写成:

   Observable.create(new Observable.OnSubscribe<String>() {

          @Override
          public void call(Subscriber<? super String> subscriber) {
              subscriber.onNext("2018年");
              subscriber.onNext("CBA");
              subscriber.onNext("辽宁队");
              subscriber.onNext("夺冠");
              subscriber.onCompleted();
          }
      }).subscribe(new Observer<String>() {
           @Override
           public void onCompleted() {
               LogUtil.d("onCompleted:");
           }
           @Override
           public void onError(Throwable e) {
               LogUtil.d("onError:"+e);
           }

           @Override
           public void onNext(String s) {
               LogUtil.d("onNext:"+s);
           }

       });

结果:


Rxjava结果.png

(2)Rxjava常见操作符
下边我们来了解一下Rxjava 常见操作符:
just:将传入的参数依次发送出来

Observable observable = Observable.just("2018年", "CBA", "辽宁队","夺冠");
// 将会依次调用:
// onNext("2018年");
// onNext("CBA");
// onNext("辽宁队");
// onNext("夺冠");
// onCompleted();

from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出:

String[] words = {"2018年", "CBA", "辽宁队","夺冠"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("2018年");
// onNext("CBA");
// onNext("辽宁队");
// onNext("夺冠");
// onCompleted();

(3)Rxjava如何切换线程:Scheduler
我们回到文章开头,RxAndroid切换线程是怎么实现的呢?看下边一个例子:

Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {
                LogUtil.d("subscriber:");
                subscriber.onNext("2018年");
                subscriber.onNext("CBA");
                subscriber.onNext("辽宁队");
                subscriber.onNext("夺冠");
                subscriber.onCompleted();
            }
        })
                 .subscribeOn(Schedulers.newThread())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribe(new Observer<String>() {
             @Override
             public void onCompleted() {
                 LogUtil.d("onCompleted:");
             }
             @Override
             public void onError(Throwable e) {
                 LogUtil.d("onError:"+e);
             }

             @Override
             public void onNext(String s) {
                 LogUtil.d("onNext:"+s);
             }

         });

subscribeOn(): 指定subscribe()所发生的线程,即 Observable.OnSubscribe被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
我们来了解一下Scheduler:
在不指定线程的情况下,RxJava遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到Scheduler(调度器),下面是Scheduler的API:

1、Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
2、Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
3、Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
4、Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

那么我们有个疑问,Rxjava内部是如何切换线程?首先我们来看subscribeOn()

subscribeOn()源码:

  public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

我们看到参数是传入Scheduler 调度器,然后创建了新的Observable,我们看到OperatorSubscribeOn这个对象,OperatorSubscribeOn原始Observable对象和调度器scheduler,那么这个OperatorSubscribeOn是什么呢,我们看下源码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;   //调度器
    final Observable<T> source; //原始Observable

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }
//①.原始观察者订阅了新的Observable后,将执行此call方法
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
          //②. call方法中使用传入的调度器创建的Worker对象的schedule方法切换线程
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                  //③ .创建了一个新的观察者
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                               //⑤. 新的观察者收到数据后直接发送给原始观察者
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                   //⑤. 新的观察者收到数据后直接发送给原始观察者
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                //④. 在切换的线程中,新的观察者订阅原始Observable,用来接收数据
                source.unsafeSubscribe(s);
            }
        });
    }
}

OperatorSubscribeOn是实现Observable的OnSubscribe 接口

  public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
public interface Action1<T> extends Action {
    void call(T t);
}

上面源码中注释已经写的很清楚了,OperatorSubscribeOn字意上来理解,Operator操作员,相当于媒介,为新的Observable发射数据。它创建了一个新的观察者订阅原始Observable,这样就可以接受原始Observable发射的数据,然后直接发送给原始观察者。
所以OperatorSubscribeOn也是间接实现了Action1,我们来看OperatorSubscribeOn在call()方法里边操作了什么。在call方法中通过scheduler.createWorker().schedule()完成线程的切换,这里就牵扯到两个对象了,Scheduler和Worker。Scheduler是个抽象类,是从外边传进来的。我们就看一个简单的Schedulers.newThread(),其他也是从类似,下面一步一步看源码:

/**
 * Static factory methods for creating Schedulers.
 */
public final class Schedulers {
  //各种调度器对象
    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final Scheduler newThreadScheduler;

    private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();

         ......
   //构造方法
    private Schedulers() {
        RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();

          ......

        Scheduler nt = hook.getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
       //①.创建newThreadScheduler对象
            newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
        }
    }
}

 //②. 获取NewThreadScheduler对象
 public static Scheduler newThread() {
        return getInstance().newThreadScheduler;
    }

Schedulers中保存了几个调度器对象,在Schedulers被加载的时候,他们就被初始化了,Schedulers就像是一个调度器的控制器,跟踪newThreadScheduler,看到newThreadScheduler在RxJavaSchedulersHook.createNewScheduler()实例化。CTRL+鼠标左键跟createNewScheduler()方法进去,最终调到NewThreadScheduler(ThreadFactory threadFactory)的方法:

**
 * Schedules work on a new thread.
 */
public final class NewThreadScheduler extends Scheduler {
    private final ThreadFactory threadFactory;

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

NewThreadScheduler就是我们调用subscribeOn(Schedulers.newThread() )传入的调度器对象,每个调度器对象都有一个createWorker方法用于创建一个Worker对象,而NewThreadScheduler对应创建的Worker是一个叫NewThreadWorker的对象,在新产生的OperatorSubscribeOn计划表中就是通过NewThreadWorker.schedule(Action0)实现线程的切换,下面我们跟踪schedule(Action0)方法:

public class NewThreadWorker extends Scheduler.Worker implements Subscription {
    private final ScheduledExecutorService executor;   //
    public NewThreadWorker(ThreadFactory threadFactory) {
        //创建一个线程池
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        executor = exec;
    }
    @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }
    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        return scheduleActual(action, delayTime, unit);
    }
    //重要:worker.schedule()最终调用的是这个方法
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        //return action;
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        //ScheduledAction就是一个Runnable对象,在run()方法中调用了Action0.call()
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);   //将Runnable对象放入线程池中
        } else {
            f = executor.schedule(run, delayTime, unit);  //延迟执行
        }
        run.add(f);

        return run;
    }
    ...
}

我们发现OperatorSubscribeOn计划表中通过NewThreadWorker.schedule(Action0),将Action0放入到一个线程池中执行,这样就实现了线程的切换。

多次subscribeOn()的情况:
我们发现,每次使用subscribeOn都会产生一个新的Observable,并产生一个新的计划表OnSubscribe,目标Subscriber最后订阅的将是最后一次subscribeOn产生的新的Observable。在每个新的OnSubscribe的call方法中都会有一个产生一个新的线程,在这个新线程中订阅上一级Observable,并创建一个新的Subscriber接受数据,最终原始Observable将在第一个新线程中发射数据,然后传送给给下一个新的观察者,直到传送到目标观察者,所以多次调用subscribeOn只有第一个起作用(这只是表面现象,其实每个subscribeOn都切换了线程,只是最终目标Observable是在第一个subscribeOn产生的线程中发射数据的)。
 多次subscribeOn()只有第一个会起作用,后面的只是在第一个的基础上在外面套了一层壳,就像下面的伪代码,最后执行是在第一个新线程中执行:

...
//第3个subscribeOn产生的新线程
new Thread(){
    @Override
    public void run() {
        Subscriber s1 = new Subscriber();
        //第2个subscribeOn产生的新线程
        new Thread(){
            @Override
            public void run() {
                Subscriber s2 = new Subscriber();
                //第1个subscribeOn产生的新线程
                new Thread(){
                    @Override
                    public void run() {
                        Subscriber<T> s3 = new Subscriber<T>(subscriber) {
                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }
                            ...
                        };
                        //①. 最后一个新观察者订阅原始Observable
                        原始Observable.subscribe(s3);
                        //②. 原始Observable将在此线程中发射数据

                              //③. 最后一个新的观察者s3接受数据

                              //④. s3收到数据后,直接发送给s2,s2收到数据后传给s1,...最后目标观察者收到数据
                         } 
                }.start();
            }
        }.start();
    }
}.start();

observeOn原理:
observeOn调用的是lift操作符,lift操作符。lift有点难理解,简单点说就是在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

observeOn一样创建了一个代理的Observable,并创建一个代理观察者接受上一级Observable的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的onNext、onCompete、onError方法。

我们看看observeOn操作符的源码:

public final class OperatorObserveOn<T> implements Observable.Operator<T, T> {
    private final Scheduler scheduler;
    //创建代理观察者,用于接收上一级Observable发射的数据
    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

    //代理观察者
    private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final NotificationLite<T> on;
        final Queue<Object> queue;
        //接受上一级Observable发射的数据
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }
        @Override
        public void onCompleted() {
            ...
            schedule();
        }
        @Override
        public void onError(final Throwable e) {
            ...
            schedule();
        }
        //开启新线程处理数据
        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }
        // only execute this from schedule()
        //在新线程中将数据发送给目标观察者
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            for (;;) {
                while (requestAmount != currentEmission) {
                    ...
                    localChild.onNext(localOn.getValue(v));
                }
            }
        }
    }
}

ObserveOnSubscriber代理观察者相当于微信代理商,让代理商帮忙买onNext,onCompleted,onError,代理商分别代表你进行购买。我们看到Worker类的 recursiveScheduler执行recursiveScheduler.schedule(this),回调到方法call中。call可以由传进来的schedule 实现线程切换。就像上边的NewThreadWorker.schedule(Action0)一样。
代理的OnSubscribe中的call方法就是让代理Subscriber订阅上一级Observable,直到订阅到原始Observable发射数据,代理Subscriber收到数据后,可能对数据做一些操作,然后将数据传送给下一级Subscriber,直到目标观察者接收到数据,目标观察者在那个线程接受数据取决于上一个Subscriber在哪一个线程调用目标观察者。
嗯,本人技术有限,也是参考以下文章学习的。也是希望今年能够耐心、坚持学下去吧。希望在2018年想找个好工作的小伙伴们,共勉、坚持!
给 Android 开发者的 RxJava 详解
https://blog.csdn.net/xmxkf/article/details/51821940

相关文章

  • RxAndroid/RxJava之初识RxAndroid简单方法

    RxAndroid/RxJava之初识RxAndroid简单方法示例 RxJava ? RxJava 主要的作用就...

  • 初识RxJava

    定义 RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asy...

  • 初识Rxjava

    星球话题:用过RxJava和RxAndroid吗?RxAndroid切换线程是怎么实现的呢? 去年知乎上参加了玉刚...

  • 初识RxJava

    什么是RxJava? 这个名词包含两部分: Rx(是ReactiveX、Reactive Extensions、R...

  • 初识rxJava

    一直不知道如何下手写技术相关的文章,但好歹也是做技术的,总要踏出第一步,各位看官不喜勿喷。刚好前段时间被rxJav...

  • 初识RxJava

    本文出自 “阿敏其人” 简书博客,转载或引用请注明出处。 开篇,自不必说,祭上经典好文 给 Android 开发者...

  • 初识Rxjava

    作为一个Android小白,只有不断的学习才能体会到一个码农的快乐,前段时间看到有人在讨论Rxjava,就好奇看一...

  • Rxjava学习:初识Rxjava

    前言 在过去的一年里,出现大量优秀的框架,火的不行,我也是从网上了解到Rx,特意搜索了相关文章,我也是通过它初步了...

  • RxJava2.0的使用详解

    RxJava2.0的使用详解 1,初识RxJava RxJava就是一种用Java语言实现的响应式编程,来创建基于...

  • Rxjava学习:谈谈Rxjava的使用

    前言 在 Rxjava学习:初识Rxjava 一文中,我们对Rxjava的几个基本概念做了介绍,今天就来谈谈对Rx...

网友评论

    本文标题:初识Rxjava

    本文链接:https://www.haomeiwen.com/subject/dfrclftx.html