RxJava

作者: 内卷程序员 | 来源:发表于2021-09-30 12:30 被阅读0次

RxJava概述

  • RxJava 是一种响应式编程,来创建基于事件的异步操作库。基于事件流的链式调用、逻辑清晰简洁。
  • RxJava 我的理解是将事件从起点(上游)流向终点(下游),中间有很多卡片对数据进操作并传递,每个卡片获取上一个卡片传递下来的结果然后对事件进行处理然后将结果传递给下一个卡片,这样事件就从起点通过卡片一次次传递直到流向终点。

RxJava观察者模式

  • 传统观察者是一个被观察者多过观察者,当被观察者发生改变时候及时通知所有观察者
  • RXjava是一个观察者多个被观察者,被观察者像链条一样串起来,数据在被观察者之间朝着一个方向传递,直到传递给观察者 。

RxJava原理理解

  • 被观察者通过订阅将事件按顺序依次传递给观察者,


    image.png
//RxAndroid中包含RxJava的内容,只引入RxAndroid还是会报错
dependencies {
    ......
    compile 'io.reactivex.rxjava2:rxjava:2.1.3'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
image.png

创建Observer(观察者)

        Observer<Integer> observer = new Observer<Integer>() {
 
            // 观察者接收事件前  ,当 Observable 被订阅时,观察者onSubscribe方法会自动被调用 
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }

            // 当被观察者生产Next事件 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件作出响应" + value);
            }

            // 当被观察者生产Error事件 
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            // 当被观察者生产Complete事件 
            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        };
       //Subscriber类 = RxJava 内置的一个实现了 Observer 的抽象类,对 Observer 接口进行了扩展 
       Subscriber<Integer> subscriber = new Subscriber<Integer>() {

           // 观察者接收事件前 ,当 Observable 被订阅时,观察者onSubscribe方法会自动被调用 
            @Override
            public void onSubscribe(Disposable d) { 
                Log.d(TAG, "开始采用subscribe连接");
            }

            // 当被观察者生产Next事件 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件作出响应" + value);
            }

            // 当被观察者生产Error事件 
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            // 当被观察者生产Complete事件 
            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        };

Subscriber 抽象类与Observer 接口的区别

  • 二者基本使用方式一致(在RxJava的subscribe过程中,Observer会先被转换成Subscriber再使用)
  • Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:
    1. onStart():在还未响应事件前调用,用于做一些初始化工作,他是在subscribe 所在的线程调用,不能切换线程,所以不能进行界面UI更新比如弹框这些。
    2. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收响应事件,比如在onStop方法中可以调用此方法结束订阅。调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用。

创建 Observable (被观察者)

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 通过 ObservableEmitter类对象产生事件并通知观察者
                // ObservableEmitter:定义需要发送的事件 & 向观察者发送事件
                     
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

RxJava 提供了其他方法用于 创建被观察者对象Observable

// 方法1:just(T...):直接将传入的参数依次发送出来
  Observable observable = Observable.just("A", "B", "C");
  // 将会依次调用:
  // onNext("A");
  // onNext("B");
  // onNext("C");
  // onCompleted();

// 方法2:fromArray(T[]) / from(Iterable<? extends T>) : 将传入的数组 / Iterable 拆分成具体对象后,依次发送出来
  String[] words = {"A", "B", "C"};
  Observable observable = Observable.fromArray(words);
  // 将会依次调用:
  // onNext("A");
  // onNext("B");
  // onNext("C");
  // onCompleted();

以上两种方法创建出来的观察者都是继承Observable,比如ObservableCreate、ObservableFromArray、ObservableMap...,

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

--------------------------------------------------------------------------------------------------------

public abstract class Observable<T> implements ObservableSource<T> {

   ...

    protected abstract void subscribeActual(Observer<? super T> observer);
 
    @Override
    public final void subscribe(Observer<? super T> observer) {
     ...
        try {
            ...
            subscribeActual(observer);
        }  catch (Throwable e) {
           ...
        }
    }
}

public final class ObservableCreate<T> extends Observable<T> {

   final ObservableOnSubscribe<T> source;

   @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

   static final class CreateEmitter<T> extends AtomicReference<Disposable> 
 implements ObservableEmitter<T>, Disposable {
  
 ...

        @Override
        public void onNext(T t) {
            if (t == null) {
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        ...
    }
}

public final class ObservableFromArray<T> extends Observable<T> {

    final T[] array;
   
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        s.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

        final Observer<? super T> actual; //对应观察者

        final T[] array;
        
        ...

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
    }
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    ... 

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), 
                      "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t),
                      "The mapper function returned a null value.") : null;
        }
    }
}

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

---------------------------------------------------------------------------------------------

 public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

        final Observer<? super T> observer;

        final T value;
  
        @Override
        public void dispose() {
            set(ON_COMPLETE);
        }

       ....
         
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }

观察者和被观察者通过subscribe订阅,订阅完成后被观察者就可以像观察者发送数据

 
        Observable.create(new ObservableOnSubscribe<Integer>() {
       
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
   
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"作出响应"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });
    }
}

image.png

链式调用


image.png
     Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
            }
        }).map(new Function<String, String>() {

            @Override
            public String apply(@NonNull String s) throws Exception {
                return null;
            }
        }).map(new Function<String, String>() {

            @Override
            public String apply(@NonNull String s) throws Exception {
                return null;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }

            @Override
            public void onNext(String s) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

这个订阅的过程就如同洋葱一样一层层封装,当订阅完成后就像剥洋葱一样一层层剥,用发射器发送数据,用onNext方法一层层发送,发送给每一层的时候就回调每一层的Function类apply方法,这个方法由开发者实现,该方法处理数据后就返回处理后的数据,然后数据又往下一层传递,直到传递到观察者手里,然后观察者接收数据


image.png

相关文章

网友评论

    本文标题:RxJava

    本文链接:https://www.haomeiwen.com/subject/peqrnltx.html