美文网首页
Rxjava 切换线程

Rxjava 切换线程

作者: 向前的zz | 来源:发表于2021-01-11 11:34 被阅读0次

根据自己的理解,公司用的还是Rxjava 1.x,不过很久以前使用过,没有过多注意这一块,不过还是想弄一下,周末的时候,主要看了 简单使用,如何切换线程,并且去理解这个过程

操作符什么的后面理解的时候,再谈论,先进行就简单的create。重点切换线程

一、准备

查看源码的版本:

implementation 'io.reactivex:rxjava:1.2.1'

这边先不纠结于 Rxjava 2.x,这个可能后续去看Rxjava2.x的时候,再去讨论了。

二、进入正题

2.1 简单使用

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        System.out.println("Observable.OnSubscribe : " + Thread.currentThread());
        subscriber.onNext("sss");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.io())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                System.out.println(s + " " + Thread.currentThread());
            }
        });

简单就是这么的使用,打印如下:

Observable.OnSubscribe: Thread[RxNewThreadScheduler-1,5,main]
sss Thread[RxIoScheduler-2,5,main]

2.2 查看subscribeOn的源码

//rx.Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    //1
    //rx.Observable 里面的成员create方法
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}
  1. 这里直接创建了一个OperatorSubscribeOn, 然后传递了一个 this,然后把scheduler线程切换

    这里注意,这个 this 我感觉比较关键,把当前的Observable对象传递了进去;当前OperatorSubscribeOn还是通过 Observablecreate()方法创建Observable对象,也就是说,OperatorSubscribeOn是一个Observable.OnSubscribe对象,并且里面含有上一个Observable对象

进入到 OperatorSubscribeOn类中

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    //成员变量
    final Scheduler scheduler;
    final Observable<T> source;
        
    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        //1.
        subscriber.add(inner);
        inner.schedule(new Action0() {
            @Override
            public void call() {
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }

                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }
                    //...省略代码
                };
              //2.
                source.unsafeSubscribe(s);
            }
        });
    }
}
  1. 其实就是从scheduler中创建一个任务,然后通过schedule()方法进行执行。

    我是这么理解的,相当于一个线程池执行

    //inner.schedule()类比下面
    ThreadPool.execute(new Runnable() {
        @Override
        public void run() {
             //执行任务
        }
    };
    

    具体是scheduler先不再这个讨论,我这边现在也还没再看(因为我是个菜鸡)

  2. source.unsafeSubscribe(s)这段语句相当于source.subscribe(s),因为源码里面调用subscribe(s)的时候也把我们的Subscriber对象转成了SafeSubscriber的对象。

到这里发现,subscribeOn相当于下面代码

private Observable<T> source;

ThreadPool.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    @Override
                    void onError(Throwable t) {
                        sub.onError(t);
                    }

                    @Override
                    void onNext(T t) {
                        sub.onNext(t);
                    }

                    @Override
                    void onCompleted() {
                        sub.onCompleted();
                    }
                };
        source.subscribe(subscriber);
    }
};

2.3 分析为什么subscribeOn多次调用只有第一次有用

其实 2.2 查看subscribeOn的源码已经得出了结论,是在线程里面通过代理了上一个的Observable<T>对象,也就是说,上游的被当前线程池里面的线程接管了。

XXX.subscribeOn(Schedulers.AAA).subscribeOn(Schedulers.BBB).subscribe()

就拿这两层来说,因为本身subscribeOn就是代理上一个的Observable<T>对象,从后面往前面看,就是说,BBB这个线程池,要给前一个AAAObservable<T>对象,如下伪代码:

private Observable<T> sourceAAA;

ThreadPoolBBB.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    //...省略
                    @Override
                    void onNext(T t) {
                        //这个是BBB线程的执行
                        sub.onNext(t);
                    }
                            //...省略
                };
        sourceAAA.subscribe(subscriber);
    }
};

AAA的前面是XXX,那伪代码应该如下:

private Observable<T> sourceXXX;

ThreadPoolAAA.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    //...省略
                    @Override
                    void onNext(T t) {
                        //这个是AAA线程的执行
                        sub.onNext(t);
                    }
                            //...省略
                };
        sourceXXX.subscribe(subscriber);
    }
};

好了,上面好像还是看不太出来,然后我们组合一下伪代码来看看

private Observable<T> sourceAAA;

ThreadPoolBBB.execute(new Runnable() {
    @Override
    public void run() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                    //...省略
                    @Override
                    void onNext(T t) {
                        //这个是BBB线程的执行
                        sub.onNext {
                                //伪代码迁移,内部执行
                                     private Observable<T> sourceXXX;

                            ThreadPoolAAA.execute(new Runnable() {
                                @Override
                                public void run() {
                                    Subscriber<T> subscriber = new Subscriber<T>() {
                                                //...省略
                                                @Override
                                                void onNext(T t) {
                                                    //这个是AAA线程的执行
                                                    sub.onNext(t);
                                                }
                                                //...省略
                                            };
                                    sourceXXX.subscribe(subscriber);
                                }
                            };
                        }
                    }
                            //...省略
                };
        sourceAAA.subscribe(subscriber);
    }
};

简化一下代码,用线程代替就是,相当于如下:

new Thread("BBB") {
  @Override
  public void run() {
    new Thread("AAA") {
      @Override
      public void run() {
        System.out.println("" + Thread.currentThread());
      }
    }.start();
  }
}.start();

打印可想而知,是里面 AAA线程;

Thread[AAA,5,main]

2.4 查看observerOn的源码

//rx.Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //...
            //2.
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            //1.
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

好家伙,看了一下,连续调用了4层,最总还是通过create()创建了一个Observable<T>对象,

  1. OnSubscribeLift和前面看subscribeOn()一样,第一个参数onSubscribe是上一个Observable<T>对象的onSubscribe,也就是说,subscribeOn()observerOn()的比较大的区别是,subscribeOn()“代理的是上一个对象的Observable<T>”,而observerOn()是“代理的是上一个的OnSubscribe<T>”,所以observerOn()就是改变了下游的线程切换。第二个参数是上一步生成的OperatorObserveOn进行了传递到里面。
  2. scheduler的对象传给了OperatorObserveOn,这个类干了切换换线程的操作。

可以看出observerOn()通过两个类(OperatorObserveOnOnSubscribeLift)来管理,也不知道是为什么,是不是有可能方便单元测试,还是为了单一职责呢,就不纠结于此了。如果我写可能就一个类做了。

2.4.1 先看OnSubscribeLift

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;
    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
      //...
      //1.
      Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
      try {
        // new Subscriber created and being subscribed with so 'onStart' it
        st.onStart();
        parent.call(st);
      } catch (Throwable e) {
        //...
      }
      //...
    }
}
  1. Operator#callOperatorObserveOn)然后通过Subscriber参数来生成了一个新的Subscriber然后通过上一个的OnSubscribe<T>对象传递。

2.4.2 OperatorObserveOn

这个类比较长,分开来了,

public final class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
  //...略
@Override
  public Subscriber<? super T> call(Subscriber<? super T> child) {
     //...略
    // 1.
      ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
      parent.init();
      return parent;

  }
   //...略
  
}
//...略
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
  //...略
}
  1. 接着 2.4.1Operator#call,里面就调用了OperatorObserveOn的内部类ObserveOnSubscriber进行把上一个的Subscriber代理了一下,然后具有了有了切换线程的能力
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
    final Subscriber<? super T> child;
    final Scheduler.Worker recursiveScheduler;
    final Queue<Object> queue;
    //...略
    public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
        this.child = child;
        this.recursiveScheduler = scheduler.createWorker();
        if (UnsafeAccess.isUnsafeAvailable()) {
            queue = new SpscArrayQueue<Object>(calculatedSize);
        } else {
            queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
        }
    }
  //...略
    void init() {
        Subscriber<? super T> localChild = child;
        localChild.add(recursiveScheduler);
        localChild.add(this);
    }

    
    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        //1.
        if (!queue.offer(NotificationLite.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        schedule();
    }

    @Override
    public void onCompleted() {
      //1.
        if (isUnsubscribed() || finished) {
            return;
        }
        finished = true;
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        //1.
        if (isUnsubscribed() || finished) {
            RxJavaHooks.onError(e);
            return;
        }
        error = e;
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    @Override
    public void call() {
        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;
        //...略
        for (;;) {
            long requestAmount = requested.get();
              //...略
            while (requestAmount != currentEmission) {
                Object v = q.poll();
                boolean empty = v == null;

                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }

                if (empty) {
                    break;
                }
                                //1.
                localChild.onNext(NotificationLite.<T>getValue(v));
            }
                      //...略
            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }
            //...略
        }
    }

    boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
      if (e != null) {
        //1.
             a.onError(e);
        return true;
      } else {
        //1.
            a.onCompleted();
        return true;
      }
      return false;
    }
}

上面已经省去了很多代码

  1. 就是通过传入进来的childSubscriber)对象,然后在外面调用 Subscriber#onNext()方法的时候,实际就进行了 queue 入队操作,然后通过recursiveScheduler.schedule(this)切换到线程中执行

    if (!queue.offer(NotificationLite.next(t))) {
      onError(new MissingBackpressureException());
      return;
    }
    schedule();
    
    protected void schedule() {
      if (counter.getAndIncrement() == 0) {
        recursiveScheduler.schedule(this);
      }
    }
    //recursiveScheduler.schedule(this); 最终执行到这里
    public void call() {
      
    }
    

    recursiveScheduler.schedule(this);最终执行了call()方法,从而达到了且换线程的目的。

    我根据根据这个思路写了伪代码

    static final class ObserverOnSubscribe<T> extends Subscriber<T> implements Runnable {
        private Scheduler scheduler;
        private Subscriber<T> child;
        private Queue<T> queue;
    
        public ObserverOnSubscribe(Scheduler scheduler, Subscriber<T> child) {
            this.scheduler = scheduler;
            this.child = child;
    
            queue = new LinkedList<>();
        }
    
        @Override
        public void run() {
            if (queue.isEmpty()) {
                child.onCompleted();
                return;
            }
            T poll = queue.poll();
            child.onNext(poll);
        }
         //...
    
        @Override
        void onNext(T t) {
            queue.offer(t);
            schedule();
        }
    
        @Override
        void onCompleted() {
            schedule();
        }
    
        public void schedule() {
            scheduler.execute(this);
        }
    }
    

2.5 observerOn每次调用,后面都会进行线程切换,但不会影响前面的线程

observerOn每次调用,后面都会进行线程切换,但不会影响前面的,因为observerOn“代理了”OnSubscribe,就相当于影响下游的切换。好比如下代码

new Thread("BBB") {
    @Override
    public void run() {
        System.out.println("" + Thread.currentThread());
        
        new Thread("AAA") {
            @Override
            public void run() {
                System.out.println("" + Thread.currentThread());
            }
        }.start();
    }
}.start();

总结

subscribeOn :

  1. 整条线路上第一次切换的有效,但有效的范围分为

    a. 有ObserverOn的话,那就是ObserverOn的下下一个,因为ObserverOn只影响下一个

    b. 如果只用了subscribeOn,那就是subscribeOn第一次切换的那个线程,然后知道ObserverOn来进行切换

<img src="http://reactivex.io/documentation/operators/images/schedulers.png" alt="">

这个代理的是 Observrable 对象

//thread-1
thread {
   Observeable#subscribe("创建了一新的");
}

为什么subscribeOn重复设置没有使用,只有第一次呢?

就像这样

//thread-2
thread {
  //thread-1
   thread {
   Observeable#subscribe("创建了一新的");
    }
}

这样读取出来其实还是thread-1,thread-2就没什么意义了。

observeOn:影响下一次订阅

observeOn代理的是Observable.OnSubscribe这个接口,就是onNext的这个接口,往下面传的这个接口,所以会影响下面的线程。

//thread-2
thread {
  //thread-1
   thread {
   Observeable#subscribe(() -> {
        Run run = this.run;
        thread {
        run.run();
      }
   });
    }
}

以上,如果有错误,非常感谢您的指正,分享就是为了学习,写错了,虽然丢脸,但是能学到东西也很舒服。

区别:subscribeOn()observerOn()的比较大的区别是,subscribeOn()“代理的是上一个对象的Observable<T>”,而observerOn()是“代理的是上一个的OnSubscribe<T>”,所以observerOn()就是改变了下游的线程切换。

Rxjava线程切换简单的实现

相关文章

  • RxJava源码分析-线程切换

    RxJava源码分析-线程切换 接着上篇分析,本篇我们来揭开RxJava线程切换的神秘面试,先上一段代码 这段代码...

  • RxJava的线程切换

    RxJava 线程切换 前言 在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程。...

  • Rxjava2 操作符原理(2)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 线程切换(3)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 基本用法(1)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 简析Flowable背压(4)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • RxJava源码分析之线程调度(一)

    RxJava强大的地方之一是他的链式调用,轻松地在线程之间进行切换。这几天也大概分析了一下RxJava的线程切换的...

  • RxJava:线程切换

    上一篇:RxJava:基本订阅流程 我们在Rxjava中最常用的两个方法: subscribeOn(Schedul...

  • Rxjava 切换线程

    根据自己的理解,公司用的还是Rxjava 1.x,不过很久以前使用过,没有过多注意这一块,不过还是想弄一下,周末的...

  • 安卓第三方组件收集

    要点:如果不是必须, 用系统控件 RxJava 线程切换需要注意的地方 RxJava 内置的线程调度器的确可以让我...

网友评论

      本文标题:Rxjava 切换线程

      本文链接:https://www.haomeiwen.com/subject/aqjlaktx.html