美文网首页Android开发Android开发经验谈Android技术知识
观察者模式在RxJava中的运用(一)RxJava整体框架分析

观察者模式在RxJava中的运用(一)RxJava整体框架分析

作者: 程序员三千_ | 来源:发表于2020-04-18 22:04 被阅读0次

    观察者模式在RxJava中的运用(一)RxJava整体框架分析

    1、传统观察者模式的定义

    • 抽象被观察者角色
      Observable接口:声明add、remvoe、notifyObservers方法

    • 抽象观察者角色
      Observer接口:声明update方法,当被观察者调用notifyObservers方法时,观察者的update方法就会被调用到。

    • 具体观察者角色
      实现Observer接口

    • 具体被观察者角色(也叫具体主题)
      实现Observable接口,定义一个集合存储所有的观察者
      通过add方法向集合里添加观察者、通过remvoe方法移除集合里的观察者、通过notifyObservers方法通知所有的
      观察者(也就是list里的观察者),然后每个观察者通过update方法更新接收到的消息。

    2、观察者模式在java(jdk)中的体现

    • 抽象被观察者角色
      Observable类,定义了存储观察者的集合(Vector<Observer> obs)和add、remvoe、notifyObservers方法,
      用Vector不用list的原因是考虑到线程安全的问题,

    • 抽象观察者角色
      Observer接口:声明update方法,当被观察者调用notifyObservers方法时,观察者的update方法就会被调用到。

    • 具体观察者角色
      实现Observer接口

    • 具体被观察者角色
      实现Observable接口,通过add方法向集合里添加观察者、通过remvoe方法移除集合里的观察者、通过notifyObservers方法通知所有的
      观察者(也就是list里的观察者),然后每个观察者通过update方法更新接收到的消息。

    3、观察者模式和发布订阅模式区别和联系

    观察者模式和发布订阅模式其实它们的主要思想是一样的,但是在观察者模式中,被观察者里保存了所有的观察者(集合),
    而在发布订阅模式中,被观察者里是不保存观察者集合的。发布订阅模式比起观察者模式,耦合度更低而已

    4、观察者模式在RxJava中的体现

     //1. 创建一个Observable  可被观察的
            Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
    
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    if(!emitter.isDisposed()){
                        emitter.onNext("hello rxjava");
                        emitter.onNext("1234");
                    }
                    emitter.onComplete();
                }
            });
            //2. 创建一个Observer 观察者
            Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.i(TAG,"onSubscribe: " + d);
                }
    
                @Override
                public void onNext(String s) {
                    Log.i(TAG,"onNext: " + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG,"onError: " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i(TAG,"onComplete ");
                }
            };
            //3 观察者通过订阅(subscribe)被观察者 把它们连接到一起
            //observer(观察者) 订阅 observable(被观察者)
            observable.subscribe(observer);
    
    
    
    • 1、在jdk中被观察者是通过add往集合里添加观察者的,RxJava中是通过observable.subscribe(observer)的方式。

    • 2、那么在RxJava中前面说的四个重要的角色是怎么定义的呢?它们在RxJava中是怎么实例化的?是怎么订阅和消息传递的呢?

    5、RxJava源码分析

    我们先看MainActivity类里的Observable.create方法
    MainActivity#Observable.create
    Observable#public static <T> Observable<T> create
    ObservableCreate# ObservableCreate<T> extends Observable<T>

    看到ObservableCreate<T> extends Observable<T>其实我们能联想到在jdk中的观察者模式,

    • 具体被观察者角色
      final class ObservableCreate<T>

    • 抽象被观察者角色
      abstract class Observable<T>

    我们再看到MainActivity类里的new Observer<String>()方法
    MainActivity#new Observer<String>()
    interface Observer<T>

    • 抽象观察者角色
      interface Observer<T>
      onNext方法就相当于我们前面提到的update方法

    • 具体的观察者角色
      匿名类Observer<String> observer

      就是我们在MainActivity中通过new一个Observer接口的方式产生的匿名类observer。这个匿名类就是
      具体的观察者,最后通过observable.subscribe(observer);把观察者和被观察者绑定,产生订阅-被订阅关系。

      我们知道,在java中抽象被观察者里是有一个集合用来存放了观察者的,所以我们进到抽象类Observable的subscribe方法看看

     public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
    

    前面是一些判空操作,进入subscribeActual,我们发现subscribeActual是一个抽象方法,那么在抽象被观察者角色Observable里的具体
    实现肯定是在具体被观察者角色ObservableCreate里的subscribeActual

    ObservableCreate#subscribeActual

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    通过new CreateEmitter<T>(observer)创建一个发射器parent,并把观察者者传入到发射器CreateEmitter里
    再调用抽象观察者对象Observer的onSubscribe,因为是抽象类的抽象方法,所以实际上就是调用我们在MainActivity创建的匿名类observer里的
    onSubscribe方法,这里把发射器传入到onSubscribe里,发射器具体传入到这里面什么作用,我们下面再分析,我们先看下一句
    source.subscribe(parent);,这一句代码实际上就是真正产生订阅操作的关键代码,这里的source就是我们上面Observable.create
    时候传进来的ObservableOnSubscribe对象。所以这里的onSubscribe方法就是MainActivity里Observable.create的创建ObservableOnSubscribe的时候实现的subscribe并把parent传了进去,parent是CreateEmitter<T>对象,subscribe里面调用的next实际就是调用了
    parent的next方法,也就是CreateEmitter类的next方法。我们再看到CreateEmitter#next

     @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    里面调用了抽象观察者observer.onNext(t);就是调用到了MainActivity里匿名类observer的next方法。在调用onNext之前,判断了isDisposed
    的值,这个isDisposed()值其实就是上面分析的observer.onSubscribe(parent);,我们可以在匿名类的onSubscribe方法里去中断事件。
    到这里我们RxJava里整个创建观察者和被观察者,及其绑定和收发消息的流程就都通了。
    其实我们把RxJava里实现的观察者模式看成发布订阅模式更为好理解一些,因为它在实现的时候,在被观察者中也是没有存储观察者对象,
    是把观察者传到了CreateEmitter发射器里这一点和发布订阅模式是类似的。

    最后我们总结下:

    在RxJava中四个重要的角色

    • 具体被观察者角色
      final class ObservableCreate<T>

    • 抽象被观察者角色
      abstract class Observable<T>

    • 抽象观察者角色
      interface Observer<T>

    • 具体的观察者角色
      匿名类Observer<String> observer

    一开始,我们会创建一个observable对象,然后调用subscribe里的发射器的onNext发送消息;第二步:创建一个Observer匿名类观察者,在onNext里接收消息,这里可以在onSubscribe里通过Disposable
    中断这个事件。

    相关文章

      网友评论

        本文标题:观察者模式在RxJava中的运用(一)RxJava整体框架分析

        本文链接:https://www.haomeiwen.com/subject/jpdzvhtx.html