美文网首页
模拟RxJava的实现原理

模拟RxJava的实现原理

作者: vpractical | 来源:发表于2019-10-14 16:21 被阅读0次

    [TOC]

    GitHub代码地址

    使用

    Observable
                   .create(new ObservableOnSubscribe<List<User>>() {
                       @Override
                       public void subscribe(Emitter<List<User>> emitter) {
                           emitter.next(list);
                           emitter.complete();
                           log("create : " + Thread.currentThread().getName());
                       }
                   })
                   .map(new Function<List<User>, List<User>>() {
                       @Override
                       public List<User> apply(List<User> users) {
                           for (User u : users) {
                               u.age = 3;
                           }
                           return users;
                       }
                   })
                   .subscribeOn(ThreadScheduler.IO)
                   .observerOn(ThreadScheduler.MAIN)
                   .subscribe(new Observer<List<User>>() {
                       @Override
                       public void onSubscribe(Disposable disposable) {
                           log("onSubscribe()");
                           log("onSubscribe() : " + Thread.currentThread().getName());
                       }
    
                       @Override
                       public void onNext(List<User> val) {
                           log("onNext(): " + val.toString());
                           log("onNext() : " + Thread.currentThread().getName());
                       }
    
                       @Override
                       public void onError(Throwable t) {
                           log("onError(): " + t.toString());
                       }
    
                       @Override
                       public void onComplete() {
                           log("onComplete()");
                       }
                   });
    

    实现

    1.被观察者抽象类

    • rxjava使用过程,每个操作符都是一层,每一层都是独立的,观察上一层,同时被下一层观察。最上层不用向上观察,他持有一个内容分发者,这是一个接口,用户实现数据分发操作。最下层不被观察,他只需要观察上一层。
    • 各个操作符的方法,返回本层被观察者的实现类,如create返回ObservableCreate
    • 各个被观察者继承这个抽象类,实现subscribeImpl,参数是下一层的观察者,由下一层调用,以此建立订阅关系
    • 除最上层,每层持有上一层的被观察者对象,调用subscribeImpl实现订阅,上一层在被订阅时同时订阅更上一层。从最下层的观察者,依次向上层订阅
    public abstract class Observable<T> {
    
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source){
            return new ObservableCreate<>(source);
        }
        public void subscribe(Observer<? super T> observer){
            subscribeImpl(observer);
        }
        public <R> Observable<R> map(Function<? super T,? extends R> function){
            return new ObservableMap<>(this,function);
        }
        public Observable<T> subscribeOn(ThreadScheduler scheduler){
            return new ObservableSubscribeOn<>(this,scheduler);
        }
        public Observable<T> observerOn(ThreadScheduler scheduler){
            return new ObservableObserverOn<>(this,scheduler);
        }
        protected abstract void subscribeImpl(Observer<? super T> observer);
    }
    

    2.create操作符:返回这一层的被观察者ObservableCreate
    rxjava 有3个主要构成

    • 被观察者
    • 观察者
    • 内容分发者:在第一层的被观察者中分发数据
      需要注意的是,rxjava不止是一层一层叠起来的结构,可能某一层中包含了多个层,多个内容分发源头等
    public interface ObservableOnSubscribe<T> {
        void subscribe(Emitter<T> emitter) throws Exception;
    }
    
    public interface Emitter<T> {
        void next(T val);
        void error(Throwable t);
        void complete();
    }
    

    参数是内容分发者接口对象,用户实现该接口后,源码调用订阅方法subscribe(),并创建发射器emitter,用户使用emitter分发数据。

    /**
     * create操作符对应的被观察者
     */
    class ObservableCreate<T> extends Observable<T> {
        private ObservableOnSubscribe<T> source;
    
        ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        @Override
        protected void subscribeImpl(Observer<? super T> observer) {
            EmitterCreate<T> emitter = new EmitterCreate<>(observer);
            observer.onSubscribe(emitter);
            try {
                source.subscribe(emitter);
            }catch (Exception e){
                e.printStackTrace();
                observer.onError(e);
            }
        }
        static final class EmitterCreate<T> implements Emitter<T>, Disposable {
            private Observer<? super T> observer;
            private boolean isDisposable;
            EmitterCreate(Observer<? super T> observer) {
                this.observer = observer;
            }
            @Override
            public void disposable() {
                isDisposable = true;
            }
            @Override
            public boolean isDisposable() {
                return isDisposable;
            }
            @Override
            public void next(T val) {
                if(isDisposable) return;
                observer.onNext(val);
            }
            @Override
            public void error(Throwable t) {
                if(isDisposable) return;
                observer.onError(t);
            }
            @Override
            public void complete() {
                if(isDisposable) return;
                observer.onComplete();
            }
        }
    }
    

    3.map操作符

    • 这一层有观察上一层的观察者MapObserver,ObservableMap本身是这一层的被观察者,在被下一层的观察者订阅时,本层的MapObserver同时订阅了上一层的被观察者
    class ObservableMap<T, R> extends Observable<R> {
        private Observable<T> observable;
        private Function<? super T, ? extends R> function;
        ObservableMap(Observable<T> observable, Function<? super T, ? extends R> function) {
            this.observable = observable;
            this.function = function;
        }
        @Override
        protected void subscribeImpl(Observer<? super R> observer) {
            observable.subscribe(new MapObserver<>(observer,function));
        }
        private static final class MapObserver<T,R> extends Basic2Observer<T,R> {
            Function<? super T,? extends R> function;
            MapObserver(Observer<? super R> observer,Function<? super T,? extends R> function){
                super(observer);
                this.function = function;
            }
            @Override
            public void onNext(T val) {
                R r = function.apply(val);
                observer.onNext(r);
            }
        }
    }
    
    • map操作符实现的数据类型转换,其实是参数传入了一个接口Function,内部定义方法apply()参数是逆变泛型T,返回值是协变泛型R,用于转换数据类型,apply的实现由用户实现
    public interface Function<T,R> {
        R apply(T t);
    }
    

    4.subscribeOn操作符:

    • 线程调度实现,判断用户传入的线程和当前线程的关系,然后看是直接订阅or放到子线程or主线程
    • 也是有自己层的被观察者和观察者对象
    class ObservableSubscribeOn<T> extends Observable<T>{
        private Observable<T> observable;
        private ThreadScheduler scheduler;
        ObservableSubscribeOn(Observable<T> observable, ThreadScheduler scheduler){
            this.observable = observable;
            this.scheduler = scheduler;
        }
        @Override
        protected void subscribeImpl(final Observer<? super T> observer) {
            Runnable r = new Runnable() {
                @Override
                public void run() {
                    BasicObserver<T> subscribeOnObserver = new BasicObserver<>(observer);
                    observable.subscribe(subscribeOnObserver);
                }
            };
            TaskScheduler.run(r,scheduler);
        }
    }
    
    public class TaskScheduler {
        private static final Handler HANDLER = new Handler(Looper.getMainLooper());
        private static final ExecutorService SERVICE = Executors.newCachedThreadPool();
    
        public static void run(Runnable r,ThreadScheduler scheduler){
            boolean isMainThread = Looper.myLooper() == Looper.getMainLooper();
            if(scheduler == ThreadScheduler.DEFAULT){
                r.run();
            }else if(scheduler == ThreadScheduler.MAIN){
                if(isMainThread){
                    r.run();
                }else{
                    HANDLER.post(r);
                }
            }else{
                SERVICE.submit(r);
            }
        }
    }
    

    5.后续的操作符都是同样的道理,在最下层只有一个观察者,他在调用上层的订阅方法时,上层会先回调他的onSubscribe(),参数是Disposable ,由源码实现,当上层回调时,如果他调用Disposable的disposable()方法,上层会中断事件传递

    public interface Disposable {
        void disposable();
        boolean isDisposable();
    }
    

    相关文章

      网友评论

          本文标题:模拟RxJava的实现原理

          本文链接:https://www.haomeiwen.com/subject/mrirmctx.html