美文网首页Android开发经验谈Android开发Android技术知识
Android开发进阶——RxJava核心架构分析

Android开发进阶——RxJava核心架构分析

作者: 谁动了我的代码 | 来源:发表于2023-02-06 15:20 被阅读0次

简介

RxJava是对响应式扩展( Reactive Extensions,称之为 ReactiveX )规范的Java 实现,该规范还有其他语言实现:RxJS、Rx.Net、RxScala、RxSwift等等(也即,ReactiveX 定义了规范,其他语言实现规范即可,所以我们这里学习RxJava的架构和设计思维,只需研究ReactiveX 即可)。RxJava是一个通过使用可观察序列来组合异步操作(也即观察者模式,观察序列表示一组观察者,后面会详细介绍),并且基于事件驱动的Java库。它基于观察者模式并扩展了支持数据/事件序列的功能,添加了很多在数据转换时使用的操作符(比如:map、flat等等,像不像Java 8的Stream流式编程)。同时,RxJava 抽象了底层线程模型实现、线程安全操作的实现,让使用方不需要关心底层实现,专注于对业务的处理。

原理图解

Rxjava的核心思路被总结在了图中,本文分为两部分,第一部分讲图中的三条流和事件传递,第二部分讲线程切换的原理,下面进入正题。


响应式编程

响应式编程是一种基于异步数据流概念的编程模式;数据/事件就像一条河流,从源头一直往下流,在流动过程中,可以被观测、被过滤、被操作,或者与另一条流合并成一条新的流,最终流向大海被消费掉;

与响应式编程相对应的有同步式编程、异步式编程:

  • 同步式编程:比如我们在主线程上请求一个网络接口,一直等到返回结果才能继续执行下一步,这就是同步式的
  • 异步式编程:开启一个子线程去请求网络接口,主线程继续执行,然后定时去查询接口返回的结果
  • 响应式编程:开启一个子线程去请求网络接口,注册监听后主线程继续执行,网络接口返回数据后,主动回调注册的监听方法,从而达到响应的目的

RxJava可以简单理解为就是观察者模式+异步处理+链式调用(流的概念)

Rxjava需要达成的共识两种设计模式

观察者模式:实现响应式编程的基础

image

装饰器模式:各种操作符的具体实现类都通过装饰器模式类拓展完成

image

Rxjava核心框架核心部分

  • ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
  • Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
  • Observer : 观察者接口,提供处理事件的回调方法。
  • ObservableOnSubscribe:被观察者与事件解耦的接口
  • Emitter : 事件发射的接口,提供发射事件的方法。
  • ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
  • XXXEmitter : 事件发射器具体实现,持有观察者引用。
  • XXXObserver : 具体观察者的实现类。
  • AbstractObservableWithUpStream: 被观察者的抽象装饰类,持有了顶层接口的引用,都是通过继承该抽象类来实现各种操作符的被观察者 。
image

源码分析create()操作符

private static void testCreate() {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("emitter发射value数据:" + i);
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println(o);
            }
        });
}

Observable & ObservableSource & ObservableOnSubscribe等

  • ObservableSource : 被观察者的顶层接口,提供订阅subsccribe()方法
  • Observable: 被观察者抽象类,实现ObservableSource的接口,并提供实际订阅的抽象方法。
  • ObservableOnSubscribe :被观察者与事件发送解耦的接口
  • Observer: 观察者接口,提供处理事件的回调方法。可以在此接口的onSubscribe()函数来控制被观察者的事件发送后,观察者能否被消费
  • ObservableXXX: 具体的被观察者实现类,持有
  • ObservableOnSubscribe接口的引用
Observable.java


Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("步骤一 :emitter发射value数据:" + i);
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println("步骤二消费事件:" + o);
            }
});


//  ObservableOnSubscribe:被观察者与事件解耦的接口
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}


ObservableCreate.java 

// 1: 构造函数保存 ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
}


@Override
protected void subscribeActual(Observer<? super T> observer) {
     CreateEmitter<T> parent = new CreateEmitter<>(observer);

     // 2: 先回调Observer的onSubscribe()函数
     observer.onSubscribe(parent);
     
     try {
     
         // 3:  ObservableOnSubscribe再 发射事件
         source.subscribe(parent);
     
     } catch (Throwable ex) {
         Exceptions.throwIfFatal(ex);
         parent.onError(ex);
     }
}
  • Emitter : 事件发射的接口,提供发射事件的方法。
  • ObservableXXX: 具体的被观察者实现类,持有ObservableOnSubscribe接口的引用
  • XXXEmitter : 事件发射器具体实现,持有观察者引用。
  • XXXObserver : 具体观察者的实现类。
ObservableCreate.java

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;
     
        final Observer<? super T> observer;
     
        // 1: CreateEmitter 持有下游 Observer的引用
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
     
        @Override
        public void onNext(T t) {
            if (t == null) {
     
                onError(ExceptionHelper.createNullPointerException("onNext called with a 
                      null value."));
     
                return;
            }
     
            // 2: 根据下游的 Observer 的onSubscribe()函数判断是否取消发射,决定Observer是 
              否调用 observer.onNext函数
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
     
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

 


 Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    System.out.println("步骤一 :emitter发射value数据:" + i);

                    // 0 : 上游被观察者传递过来的 emitter
                    emitter.onNext("value=" + i);
                }
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Throwable {
                System.out.println("步骤二消费事件:" + o);
            }
        });

map()操作符源码分析

 /**
     *  直接对发射出来的事件进行处理并且产生新的事件,然后再次发射
     */
    private static void testMap() {
        Observable.just("aaa")
                .map(new Function<String, Object>() {
                    @Override
                    public Object apply(String s) throws Throwable {
                        System.out.println("步骤二: "+"事件转换之后再次发射");
                        return s+" + bbb";
                    }
                }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("步骤一:"+ "use Subscribe connect Observable and Observer");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("步骤三: "+"Next event:" + o + " response");
            }
     
            @Override
            public void onError(Throwable e) {
     
            }
     
            @Override
            public void onComplete() {
     
            }
        });
    }

Observable.just() 生产:ObservableJust类,并在发射事件时调用subscribeAcutal函数

ObservableJust.java

public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }
     
    // Observer 是下游的最后一个 observer
     
    // subscribeActual 是 抽象类 Observable的具体实现类ObservableJust,它会在
   // Observable的 subscribe()中被回调

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
     
    @Override
    public T get() {
        return value;
    }
}

 

Observable.java

public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            
            // 利用java多态的特性,直接调用 ObservableJust.java中的 subscribeActual
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
     
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

被观察者通过 ObservableScalarXMap.ScalarDisposable.run()发射事件

ObservableScalarXMap.ScalarDisposable.java

@Override
public void run() {
     if (get() == START && compareAndSet(START, ON_NEXT)) {

           // 持有下游的 Observer的引用,直接消费事件
           observer.onNext(value);
           if (get() == ON_NEXT) {
                 lazySet(ON_COMPLETE);
                 observer.onComplete();
           }
      }
}
image

在Android开发中rxjava部分是非常重要的;想要更深入学习或者更多Android核心技术,可以参考《Android核心技术手册》点击查看里面上千个技术知识。

文末

1)RxJava

有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

2)与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

3)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

4)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

相关文章

网友评论

    本文标题:Android开发进阶——RxJava核心架构分析

    本文链接:https://www.haomeiwen.com/subject/haetkdtx.html