美文网首页
RxJava源码(二)

RxJava源码(二)

作者: Ayres | 来源:发表于2018-01-01 18:05 被阅读0次

    使用时:

          Observable.just("xxxxx")
                .map(new Function<String, Object>() {
                    @Override
                    public Object apply(@NonNull String s) throws Exception {
                        return null;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        
                    }
    
                    @Override
                    public void onNext(@NonNull Object o) {
    
                    }
    
                    @Override
                    public void onError(@NonNull Throwable e) {
    
                    }
    
                    @Override
                    public void onComplete() {
    
                    }
                });
    

    注意到

                 .subscribeOn(Schedulers.io())//子线程执行
                .observeOn(AndroidSchedulers.mainThread()) //主线程
    

    一、子线程执行

    执行
    代表线程切换,源码分析

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    进入ObservableSubscribeOn类,只需要分析

     @Override
     public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
    
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
    

    方法中

          parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

    SubscribeTask是一个Runnable

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    

    scheduler.scheduleDirect()方法传入一个Runnable ,scheduler为
    .subscribeOn(Schedulers.io())中Schedulers.io()看一下代码

      @NonNull
      public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    

    IO初始化为

    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    

    进一步

    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    

    IoHolder.DEFAULT

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    

    IoScheduler

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    

    WORKER_THREAD_FACTORY为线程工厂

      static final RxThreadFactory WORKER_THREAD_FACTORY;
    

    看一下 this(WORKER_THREAD_FACTORY);

       public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    
    /**
     * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
     *                      system properties for configuring new thread creation. Cannot be null.
     */
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }
    

    pool 为线程池 , start();方法

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    

    CachedWorkerPool类

                private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;//线程安全的集合,联想到eventbus中保证集合如下,先复制,再添加
    
    image.png

    总之是封装一个线程池对象,回到

         parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

    中scheduler.scheduleDirect()方法再Scheduler类

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    

    scheduleDirect具体实现

      @NonNull
     public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
    
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        DisposeTask task = new DisposeTask(decoratedRun, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    

    createWorker();方法在IoScheduler类实现

      @Override
      public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    

    w.schedule(task, delay, unit);方法IoScheduler类实现

        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
    
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    

    threadWorker为

       EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }
    

    threadWorker.scheduleActual()为NewThreadWorker

          public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future<?> f;
            if (delayTime <= 0L) {
                f = executor.submit(task);
            } else {
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }
    

    是一些线程池

    二、 主线程执行

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

    ObservableObserveOn类中

     @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
    
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    发现

            Scheduler.Worker w = scheduler.createWorker();
    
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    

    onNext中

       @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
    
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
    

    schedule();方法

      void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    

    .observeOn(AndroidSchedulers.mainThread())方法

       private static final class MainHolder {
    
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }
    

    HandlerScheduler类

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
    

    new Handler(Looper.getMainLooper()) 精华 new Handler() 和 new Handler(Looper.getMainLooper()) 的区别
    new Handler() , 如果是在主线程中是没问题,但是有的时候可能会在子线程中调用,肯定就报错。
    new Handler(Looper.getMainLooper()),确保创建的Handler永远在主线程中,Looper要是主线程的Looper。

    三、简单实现子线程

    Observable

    public Observable<Bitmap> subscribeOn(Schedulers schedulers) {
        return onAssembly(new ObservableSchedulers(this,schedulers));
    }
    

    ObservableSchedulers

    final class ObservableSchedulers<T> extends Observable<T> {
    final Observable<T> source;
    final Schedulers schedulers;
    
    public ObservableSchedulers(Observable<T> source, Schedulers schedulers) {
        this.source = source;
        this.schedulers = schedulers;
    }
    
    @Override
    protected void subscribeActual(Observer<T> observer) {
        schedulers.scheduleDirect(new SchedulerTask(observer));
    }
    
    private class SchedulerTask implements Runnable{
        final Observer<T> observer;
        public SchedulerTask(Observer<T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void run() {
            // 线程池最终回来执行 Runnable -> 这行代码,会执行上游的 subscribe()
            // 而这个run方法在子线程中
            source.subscribe(observer);
        }
    }
    

    Schedulers 类

    public abstract class Schedulers {
    static Schedulers IO;
    static {
        IO = new IOSchedulers();
    }
    
    public static Schedulers io() {
        return IO;
    }
    
    public abstract void scheduleDirect(Runnable runnable);
    
    private static class IOSchedulers extends Schedulers {
        ExecutorService service;
        public IOSchedulers(){
            service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                @Override
                public Thread newThread(@NonNull Runnable r) {
                    return new Thread(r);
                }
            });
        }
    
        @Override
        public void scheduleDirect(Runnable runnable) {
            service.execute(runnable);
        }
    }
    }
    

    四、主线程实现

    Observable中

      public Observable<T> observerOn(Schedulers schedulers) {
        return onAssembly(new ObserverOnObservable(this,schedulers));
    }
    

    ObserverOnObservable

    class ObserverOnObservable<T> extends Observable<T> {
    final Observable<T> source;
    final Schedulers schedulers;
    public ObserverOnObservable(Observable<T> source, Schedulers schedulers) {
        this.source = source;
        this.schedulers = schedulers;
    }
    
    @Override
    protected void subscribeActual(Observer<T> observer) {
        source.subscribe(new ObserverOnObserver(observer,schedulers));
    }
    
    private class ObserverOnObserver<T> implements Observer<T>,Runnable{
        final Observer<T> observer;
        final Schedulers schedulers;
        private T value;
        public ObserverOnObserver(Observer<T> observer, Schedulers schedulers) {
            this.observer = observer;
            this.schedulers = schedulers;
        }
    
        @Override
        public void onSubscribe() {
            observer.onSubscribe();
        }
    
        @Override
        public void onNext(@NonNull T item) {
            value = item;
            schedulers.scheduleDirect(this);
    
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            observer.onError(e);
        }
    
        @Override
        public void onComplete() {
            observer.onComplete();
        }
    
        @Override
        public void run() {
            // 主线程 或者 其他
            observer.onNext(value);
        }
    }
    

    }
    完善Schedulers

    public abstract class Schedulers {
    static Schedulers MAIN_THREAD;
    static Schedulers IO;
    static {
        IO = new IOSchedulers();
        MAIN_THREAD = new MainSchedulers(new Handler(Looper.getMainLooper()));
    }
    
    public static Schedulers io() {
        return IO;
    }
    
    public abstract void scheduleDirect(Runnable runnable);
    
    public static Schedulers mainThread() {
        return MAIN_THREAD;
    }
    
    private static class IOSchedulers extends Schedulers {
        ExecutorService service;
        public IOSchedulers(){
            service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                @Override
                public Thread newThread(@NonNull Runnable r) {
                    return new Thread(r);
                }
            });
        }
    
        @Override
        public void scheduleDirect(Runnable runnable) {
            service.execute(runnable);
        }
    }
    
    private static class MainSchedulers extends Schedulers {
        private Handler handler;
        public MainSchedulers(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public void scheduleDirect(Runnable runnable) {
            Message message = Message.obtain(handler,runnable);
            handler.sendMessage(message);
        }
    }
    

    相关文章

      网友评论

          本文标题:RxJava源码(二)

          本文链接:https://www.haomeiwen.com/subject/bwstnxtx.html