美文网首页Android开发Android开发经验谈Android技术知识
观察者模式在RxJava中的运用(一)RxJava整体框架分析

观察者模式在RxJava中的运用(一)RxJava整体框架分析

作者: 程序员三千_ | 来源:发表于2020-04-18 22:04 被阅读0次

观察者模式在RxJava中的运用(一)RxJava整体框架分析

1、传统观察者模式的定义

  • 抽象被观察者角色
    Observable接口:声明add、remvoe、notifyObservers方法

  • 抽象观察者角色
    Observer接口:声明update方法,当被观察者调用notifyObservers方法时,观察者的update方法就会被调用到。

  • 具体观察者角色
    实现Observer接口

  • 具体被观察者角色(也叫具体主题)
    实现Observable接口,定义一个集合存储所有的观察者
    通过add方法向集合里添加观察者、通过remvoe方法移除集合里的观察者、通过notifyObservers方法通知所有的
    观察者(也就是list里的观察者),然后每个观察者通过update方法更新接收到的消息。

2、观察者模式在java(jdk)中的体现

  • 抽象被观察者角色
    Observable类,定义了存储观察者的集合(Vector<Observer> obs)和add、remvoe、notifyObservers方法,
    用Vector不用list的原因是考虑到线程安全的问题,

  • 抽象观察者角色
    Observer接口:声明update方法,当被观察者调用notifyObservers方法时,观察者的update方法就会被调用到。

  • 具体观察者角色
    实现Observer接口

  • 具体被观察者角色
    实现Observable接口,通过add方法向集合里添加观察者、通过remvoe方法移除集合里的观察者、通过notifyObservers方法通知所有的
    观察者(也就是list里的观察者),然后每个观察者通过update方法更新接收到的消息。

3、观察者模式和发布订阅模式区别和联系

观察者模式和发布订阅模式其实它们的主要思想是一样的,但是在观察者模式中,被观察者里保存了所有的观察者(集合),
而在发布订阅模式中,被观察者里是不保存观察者集合的。发布订阅模式比起观察者模式,耦合度更低而已

4、观察者模式在RxJava中的体现

 //1. 创建一个Observable  可被观察的
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                if(!emitter.isDisposed()){
                    emitter.onNext("hello rxjava");
                    emitter.onNext("1234");
                }
                emitter.onComplete();
            }
        });
        //2. 创建一个Observer 观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG,"onSubscribe: " + d);
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,"onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"onComplete ");
            }
        };
        //3 观察者通过订阅(subscribe)被观察者 把它们连接到一起
        //observer(观察者) 订阅 observable(被观察者)
        observable.subscribe(observer);


  • 1、在jdk中被观察者是通过add往集合里添加观察者的,RxJava中是通过observable.subscribe(observer)的方式。

  • 2、那么在RxJava中前面说的四个重要的角色是怎么定义的呢?它们在RxJava中是怎么实例化的?是怎么订阅和消息传递的呢?

5、RxJava源码分析

我们先看MainActivity类里的Observable.create方法
MainActivity#Observable.create
Observable#public static <T> Observable<T> create
ObservableCreate# ObservableCreate<T> extends Observable<T>

看到ObservableCreate<T> extends Observable<T>其实我们能联想到在jdk中的观察者模式,

  • 具体被观察者角色
    final class ObservableCreate<T>

  • 抽象被观察者角色
    abstract class Observable<T>

我们再看到MainActivity类里的new Observer<String>()方法
MainActivity#new Observer<String>()
interface Observer<T>

  • 抽象观察者角色
    interface Observer<T>
    onNext方法就相当于我们前面提到的update方法

  • 具体的观察者角色
    匿名类Observer<String> observer

    就是我们在MainActivity中通过new一个Observer接口的方式产生的匿名类observer。这个匿名类就是
    具体的观察者,最后通过observable.subscribe(observer);把观察者和被观察者绑定,产生订阅-被订阅关系。

    我们知道,在java中抽象被观察者里是有一个集合用来存放了观察者的,所以我们进到抽象类Observable的subscribe方法看看

 public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

前面是一些判空操作,进入subscribeActual,我们发现subscribeActual是一个抽象方法,那么在抽象被观察者角色Observable里的具体
实现肯定是在具体被观察者角色ObservableCreate里的subscribeActual

ObservableCreate#subscribeActual

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

通过new CreateEmitter<T>(observer)创建一个发射器parent,并把观察者者传入到发射器CreateEmitter里
再调用抽象观察者对象Observer的onSubscribe,因为是抽象类的抽象方法,所以实际上就是调用我们在MainActivity创建的匿名类observer里的
onSubscribe方法,这里把发射器传入到onSubscribe里,发射器具体传入到这里面什么作用,我们下面再分析,我们先看下一句
source.subscribe(parent);,这一句代码实际上就是真正产生订阅操作的关键代码,这里的source就是我们上面Observable.create
时候传进来的ObservableOnSubscribe对象。所以这里的onSubscribe方法就是MainActivity里Observable.create的创建ObservableOnSubscribe的时候实现的subscribe并把parent传了进去,parent是CreateEmitter<T>对象,subscribe里面调用的next实际就是调用了
parent的next方法,也就是CreateEmitter类的next方法。我们再看到CreateEmitter#next

 @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

里面调用了抽象观察者observer.onNext(t);就是调用到了MainActivity里匿名类observer的next方法。在调用onNext之前,判断了isDisposed
的值,这个isDisposed()值其实就是上面分析的observer.onSubscribe(parent);,我们可以在匿名类的onSubscribe方法里去中断事件。
到这里我们RxJava里整个创建观察者和被观察者,及其绑定和收发消息的流程就都通了。
其实我们把RxJava里实现的观察者模式看成发布订阅模式更为好理解一些,因为它在实现的时候,在被观察者中也是没有存储观察者对象,
是把观察者传到了CreateEmitter发射器里这一点和发布订阅模式是类似的。

最后我们总结下:

在RxJava中四个重要的角色

  • 具体被观察者角色
    final class ObservableCreate<T>

  • 抽象被观察者角色
    abstract class Observable<T>

  • 抽象观察者角色
    interface Observer<T>

  • 具体的观察者角色
    匿名类Observer<String> observer

一开始,我们会创建一个observable对象,然后调用subscribe里的发射器的onNext发送消息;第二步:创建一个Observer匿名类观察者,在onNext里接收消息,这里可以在onSubscribe里通过Disposable
中断这个事件。

相关文章

网友评论

    本文标题:观察者模式在RxJava中的运用(一)RxJava整体框架分析

    本文链接:https://www.haomeiwen.com/subject/jpdzvhtx.html