美文网首页
学习笔记RxJava

学习笔记RxJava

作者: 回眸婉约 | 来源:发表于2020-12-14 15:28 被阅读0次

    使用

    Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello world");
                e.onComplete();
            }
        }).map(new Function<String, String>(){
            @Override
            public String apply(String s) throws Throwable {
                return s+"<<<";
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.e("onSubscribe","onSubscribe");
                    }
                    @Override
                    public void onNext(@NonNull String s) {
                        Log.e("onNext","onNext:"+s);
                    }
                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e("onError","onError");
                    }
                    @Override
                    public void onComplete() {
                        Log.e("onComplete","onComplete" );
                    }
                });
    

    发射一个字符串,经过map转化,然后发到观察者onNext输出数据

    查看源码

    1、Observable.create(..)

      public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
          Objects.requireNonNull(source, "source is null");
          return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
      }
    

    接受一个ObservableOnSubscribe参数,返回一个ObservableCreate对象,并把参数传了进去,RxJavaPlugins 装配这个new的对象,返回Observable,但实质是ObservableCreate

    RxJavaPlugins是一个hook类,主要就是把你new出来的对象,使用的操作符转换成RxJava具体操作的一个类

    2、.map(new Function<String, String>(){..})

      public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
            Objects.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
      }
    

    因为create已经返回了一个Observable对象,直接使用该对象的map方法。同样也是装配了一个ObservableMap(this,mapper)
    this就是ObservableCreate对象
    mapper是需要转换的方法
    这时候ObservableCreateObservableMap已经连起来了
    3、.subscribeOn(Schedulers.io())

      public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
      }
    

    装配返回一个Observable对象实质是ObservableSubscribeOn(this, scheduler)
    this是ObservableMap对象
    scheduler是Schedulers.io()线程调度对象,订阅放到了io线程 或者是newThread、single...
    这时候 上游的ObservableMap已经和ObservableSubscribeOn连起来了,并且指定了订阅的地方是在io线程
    4、.observeOn(AndroidSchedulers.mainThread())

      public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
      }
    

    装配返回Observable对象,实质是ObservableObserveOn
    this传入的是ObservableSubscribeOn
    scheduler是Android的主线程
    这时候ObservableSubscribeOnObservableObserveOn连起来了
    5、.subscribe(new Observer<String>(){onSubscribe、onNext...等方法})

    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
          try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
            subscribeActual(observer);  //关键方法
          } catch (NullPointerException e) {
    
          } catch (Throwable e) {
    
          }
    }
    

    主要就是subscribeActual(observer);当前对象是ObservableObserveOn,也就是ObservableObserveOn调用了subscribeActual(observer) 并把观察者的对象传了进来,也就是 onSubscribe、onNext...等方法 的对象

    到这里创建、订阅、关联已经基本完成

    前面的类名比较乱,什么ObservableObserveOn,又是什么ObservableSubscribeOn,还有Observable,后面还有ObserveOnObserver等等 这些我第一次看也比较晕,绕了半天,后来想想还是根据英文意思来看
    比如Observable 中文 可观察的 也就是 被观察者
    Observe 中文 观察的
    ObservableObserveOn 中文 被观察者 观察 在...  
    ObservableSubscribeOn 被观察者 订阅 在...
    接下来是关键

    订阅

    理一下 ObservableCreate -> ObservableMap -> ObservableSubscribeOn -> ObservableObserveOn -> 执行subscribeActual()
    在上面的方法中 执行subscribeActual()对象是最后一个ObservableObserveOn

    ObservableObserveOn.subscribeActual()
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
        } else {
          Scheduler.Worker w = scheduler.createWorker();
          source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
      }
    

    第一个判断是判断scheduler对象是否是TrampolineScheduler,这个类里面注释说 在当前线程上工作,内容放入队列,但不会立即执行,暂时忽略
    下面创建了一个 Worker ,new了一个对象ObserveOnObserver,这个静态类,继承了Observer<T>(Observer<T>是为了下游对象)和Runnable()(run方法中有个判断默认是false,暂时不知道第一个方法是做什么的暂时忽略,网上说与backpressure有关 不是背后的压力,是后面的压力)。
    ObserveOnObserver这个类里面有upstream,有queue,有worker,是用来最后执行,你定义的Observer里面的方法
    source订阅了ObserveOnObserver,这个source对象是ObservableSubscribeOn,执行了他的subscribeActual()

    ObservableSubscribeOn.subscribeActual()
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
          final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
          observer.onSubscribe(parent);
          parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    这个类是  被观察者订阅在   定义的是io线程
    首先new了一个对象SubscribeOnObserver 传入了 observer 这个是具体实现最后发射的对象
    接着就调用了observeronSubscribe(parent)方法

      @Override
      public void onSubscribe(Disposable d) {
          if (DisposableHelper.validate(this.upstream, d)) {
              this.upstream = d;
              if (d instanceof QueueDisposable) {
                  //去掉其他代码
              }
    
              queue = new SpscLinkedArrayQueue<>(bufferSize);
              downstream.onSubscribe(this);
          }
      }
    

    这里做一个保留,第二个if判断if (d instanceof QueueDisposable) 判断是否是 一次性队列
    接着创建了一个SpscLinkedArrayQueue这个类里面有一个 AtomicReferenceArray 原子的引用集合,保证发射出去的数据是原子操作不可被打断。
    最后调用了下游的方法,回调到了我们定义的onSubscribe方法中打印log
    回到前面 最后一行

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

    分部来看
    开始new了一个SubscribeTask这一个线程,并把parent传了进去,这个parent可以理解为当前实际操作对象 握着 下游 实例的对象
    既然是个线程肯定有run方法

    @Override
    public void run() {
      source.subscribe(parent);
    }
    

    这个source对象 实际是前一个结点的对象也就是map 也就是ObservableMap,这里只定义了还没使用
    接着上面的Task交给了scheduler.scheduleDirect

      @NonNull
      public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
      }
    

    createWorker()是IoScheduler中初始化的也就是指定 被观察者 在那个线程执行操作Schedulers.io()

    public Worker createWorker() {
      return new EventLoopWorker(pool.get()); //这里的get是返回pool的对象
    }
    
    //EventLoopWorker的构造函数
    EventLoopWorker(CachedWorkerPool pool) {
      this.pool = pool;
      this.tasks = new CompositeDisposable();
      this.threadWorker = pool.get();   //这里的get是返回pool里一个ThreadWorker对象
    }
    
    final AtomicReference<CachedWorkerPool> pool;
    
    static final class CachedWorkerPool implements Runnable {
       private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
    }
    

    这里有 一个pool.get()对象,这是个原子性的缓存工作池
    CachedWorkerPool是个static的线程类,里面有个队列的数据结构,pool.get();是返回一个ThreadWorker对象,循环找,找到就返回,找不到就new一个然后添加到pool里再返回,这个类也会定期清理里面的worker

    回到scheduleDirect(),第三行 new DisposeTask 一次性的任务 并传入 线程 和 “工作者”对象 。DisposeTask也是一个线程里面的run方法,执行了传入的线程。
    w.schedule(task, delay, unit);在IoScheduler中实现

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime,@NonNull TimeUnit unit) {
      if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
      }
    
      return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
    

    ThreadWorker继承NewThreadWorker 打开scheduleActual()

      @NonNull
      public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent)      {
          Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
          ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
          if (parent != null) {
              if (!parent.add(sr)) {
                  return sr;
                }
              }
    
              Future<?> f;
              try {
                if (delayTime <= 0) {
                  f = executor.submit((Callable<Object>)sr);
                  } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                  }
                  sr.setFuture(f);
                  } catch (RejectedExecutionException ex) {
                    if (parent != null) {
                      parent.remove(sr);
                    }
                    RxJavaPlugins.onError(ex);
                  }
    
                  return sr;
                }
    

    把线程和一个一次性的工作对象放入ScheduledRunnable,在塞入线程池executor.schedule(),前面的判断也很明显是否要延迟。
    再回到前面scheduler.scheduleDirect返回一个 DisposeTask,再回到前面ObservableSubscribeOn.subscribeActual()

    //这里复制是方法最开始的代码
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
          final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
          observer.onSubscribe(parent);
          parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    parent.setDisposable( 把DisposeTask设置了一下 )

      DisposableHelper.setOnce(this, d);
    
      public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
          Objects.requireNonNull(d, "d is null");
            if (!field.compareAndSet(null, d)) {   
                d.dispose();            //主要是这个方法
                if (field.get() != DISPOSED) {
                  reportDisposableSet();
                }
                return false;
              }
              return true;
          }
    

    这个d.dispose() d对象是CompositeDisposable,这个dispose()里面主要就是把一个boolean disposed取反,主要为了 和之前绑定的线程 只能执行一次操作
    到这里基本上前期的准备工作已经做好
    回顾一下,开始create() 创建了ObservableCreate -> ObservableMap -> ObservableSubscribeOn -> ObservableObserveOn 这些都是连起来的,并且都有一个source对象,source指向的上游对象
    最后subscribe(),执行ObservableObserveOnsubscribeActual()方法, 通过对source的subscribe(),在ObservableSubscribeOn中定义了Task线程并放到线程池中
    ,等待执行

    SubscribeTask执行
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
          }
    
          @Override
          public void run() {
            source.subscribe(parent);
          }
        }
    

    这个类是写在ObservableSubscribeOn中,所以source是ObservableMap对象subscribe() 实际执行还是subscribeActual(..)

    @Override
    public void subscribeActual(Observer<? super U> t) {
      source.subscribe(new MapObserver<T, U>(t, function));
    }
    

    这个source 实际对象是 ObservableCreate,调用了subscribe传入了new MapObserver

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
      CreateEmitter<T> parent = new CreateEmitter<>(observer);
      observer.onSubscribe(parent);
    
      try {
        source.subscribe(parent);
        } catch (Throwable ex) {
          Exceptions.throwIfFatal(ex);
          parent.onError(ex);
        }
      }
    

    这里创建了一个发射器对象CreateEmitter,传入了一个observer,这个observer就是上面传过来的MapObserver
    下面这个MapObserver订阅了这个发射器对象,也就是说这个发射器对象交给了下游的MapObserver
    再下面的source就是MainActivity(这个名字无所谓主要是后面的对象 具体是在什么地方new的)的ObservableOnSubscribe
    开始source.subscribe(parent)回调到了我们实例化ObservableOnSubscribe中,重写的方法,也就是这里

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("hello world");//这个e是之前定义的发射器对象CreateEmitter
            e.onComplete();
        }
    })
    

    发射器onNext("hello world")进入CreateEmitteronNext()

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(ExceptionHelper.createNullPointerException("onNext    called with a null value."));
            return;
          }
          if (!isDisposed()) {
            observer.onNext(t);
          }
        }
    

    先是一个判空,然后一次性检查,最后observer.onNext,这个observer是初始化传入的,之前说了是MapObserver

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
    
        if (sourceMode != NONE) {
            downstream.onNext(null);
            return;
        }
    
        U v;
    
        try {
            v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        downstream.onNext(v);
    }
    

    第二个判断看变量注解是 保持上游已建立融合模式 ,这个也不是非常明白,看代码也明白这里肯定是false
    下一步 mapper.apply(t),这个mapper是一个Function对象,初始化的时候被赋值的,也就是创建的时候,也就是我们需要一个map操作符的时候,创建赋值的
    这个apply会回调到map操作符 里面的apply(方法)。map是一个数据转换的,比如发射了int用map转换成String
    转换后调用了下游的onNext,也就是下游的具体操作对象,SubscribeOnObserver

    @Override
    public void onNext(T t) {
      downstream.onNext(t);
    }
    

    这里什么也没干 直接调用下游onNext,ObserveOnObserver

    @Override
    public void onNext(T t) {
      if (done) {
        return;
      }
    
      if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
      }
      schedule();
    }
    
    void schedule() {
      if (getAndIncrement() == 0) {
          worker.schedule(this);
      }
    }
    

    onNext 第二个判断不知道是我AS抽风了还是什么原因 Alt+单击 看到的一直是false但是真正的确实 true,这里面是把值 原子性的 放到队列中
    继续schedule()
    这里判断保证只做一次。worker.schedule(this) 传入了一个线程也就是本身this
    这里的worker是HandlerWorker是AndroidSchedulers.mainThread(),我们定义在android的主线程执行内容。
    我们知道线程执行是执行run方法,直接跳到run(),暂时忽略Android里面handler发送消息 handler.sendMessageDelayed(message, unit.toMillis(delay));只要知道run是在主线程执行的就ok

    @Override
    public void run() {
      if (outputFused) {
        drainFused();
        } else {
          drainNormal();
        }
      }
    

    这里的if 也是和融合相关,暂时忽略

    drainNormal()

    关键的来了

    void drainNormal() {
    int missed = 1;
    
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;
    
    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
          }
    
          for (;;) {
            boolean d = done;
            T v;
    
            try {
                v = q.poll();
                } catch (Throwable ex) {
                  Exceptions.throwIfFatal(ex);
                  disposed = true;
                  upstream.dispose();
                  q.clear();
                  a.onError(ex);
                  worker.dispose();
                  return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {
                  return;
                }
    
                if (empty) {
                  break;
                }
    
                a.onNext(v);
              }
    
              missed = addAndGet(-missed);
              if (missed == 0) {
                break;
              }
            }
          }
    

    乍一看。。两无限循环。。一看就好麻烦。。。
    看到开始声明了两个变量一个q和a,q是队列,a是下游对象,再看到a.onNext()基本胜利一半了,里面内容无非就是从队列中获取数据,然后发射到下游去。
    检查是否中止,不中止进入下一个循环,这个循环就比较简单了,从队列中取一个值,然后判空,不为空就发射,这个a对象就是我们定义的Observer中的onNext方法
    再后面是一个原子性操作,这个我不确定作用具体是什么,忙猜是防止漏掉数据,为外面的循环多一层跳出的方式。希望有人能解决我的疑惑。我也不知道为什么这里会有for的无限循环。。这可读性太差了(应该是我太菜了🤣)
    看一下他是怎么检查中止的

    boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a)  {
      if (disposed) {
          queue.clear();
          return true;
        }
        if (d) {
          Throwable e = error;
          if (delayError) {
              if (empty) {
                  disposed = true;
                  if (e != null) {
                  a.onError(e);
                } else {
                  a.onComplete();
                }
                  worker.dispose();
                  return true;
                }
          } else {
              if (e != null) {
                disposed = true;
                queue.clear();
                a.onError(e);
                worker.dispose();
                return true;
                } else
                if (empty) {
                  disposed = true;
                  a.onComplete();
                  worker.dispose();
                  return true;
                }
            }
        }
        return false;
    }
    

    判断是否是disposed,接着d是个boolean是 传进来的done,影响这个done是有在onError()onComplete()的时候
    如果 手动 或者try cratch 抛出了异常 这个delayError会为true 这里会调用a.onError(e);否则调用结束
    这里结束的时候有个 worker.dispose(); 在最开始初始化的时候 一顿封装后 把DisposeTask交给了worker对象

    @Override
    public void dispose() {
      if (once.compareAndSet(false, true)) {
        tasks.dispose();
    
        // releasing the pool should be the last action
        pool.release(threadWorker);
      }
    }
    
    void release(ThreadWorker threadWorker) {
      // Refresh expire time before putting worker back in pool
      threadWorker.setExpirationTime(now() + keepAliveTime);
    
      expiringWorkerQueue.offer(threadWorker);
    }
    

    这里官方的注释已经写明了 释放pool,将工作线程放回pool并刷新过期时间,还调用了dispose()处理一些一次性的标记
    到这里基本一整个的流程基本上结束了

    总结

    个人觉着这个RxJava就是一个个的结点+链式调用,创建一个方法,它帮你用一个实际操作的类包起来,每个结点都有一个源结点,这个源结点指向的前一个对象,最后 subscribe 订阅时候 开始执行,一步步调用 源 结点,并一步步在结点中初始化,推到最前端,调用定义的方法,e.onNext() 继续调用下游的onNext() 最后调用到Observer的观察者方法,从而形成一个完整的 链。
    我画了一个过程的图

    最开始使用的结点图
    create() 创建了ObservableCreate -> ObservableMap -> ObservableSubscribeOn -> ObservableObserveOn -> Observer
    每个对象都像一个结点,结点之间进行连接,当如果把map这个结点去掉,或者增加其他结点也是没问题的
    感觉有些不足比如java的一些原子性对象的使用,准备来全面学习一下

    如果内容有错误请及时联系我,我并加以改正

    相关文章

      网友评论

          本文标题:学习笔记RxJava

          本文链接:https://www.haomeiwen.com/subject/wfbuxktx.html