美文网首页 移动 前端 Python Android Java
Android RxJava基本使用及原理

Android RxJava基本使用及原理

作者: 1a473fcb13b0 | 来源:发表于2019-01-11 20:51 被阅读463次

RxJava 官网地址:http://reactivex.io/documentation/operators.html
RxJava 中文翻译:https://mcxiaoke.gitbooks.io/rxdocs/content/
RxJava Github: https://github.com/ReactiveX/RxJava
RxAndroid Github:https://github.com/ReactiveX/RxAndroid

RxJava作用

一个词:异步
前面我们已经讲到了Android retrofit基本使用及原理,在数据请求到后,onResponse方法是在子线程内,内不能进行UI更新,故而需要用Handler或者runOnUiThread进行UI更新,在我们学习到RxJava后就可以才用一种更加简洁的方式去处理异步调用了。

RxJava好处

一个词:简洁
随着程序逻辑变得越来越复杂,它依然能够保持简洁。

操作符和基本使用方法参见下面文字和demo

参考文章
https://www.jianshu.com/p/cd984dd5aae8
参考示例
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
https://github.com/nanchen2251/RxJava2Examples

1、引入依赖:

    implementation 'io.reactivex.rxjava2:rxjava:2.2.5'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

2、操作基本步骤
①、创建被观察者--发送数据

        Observable  observable=Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                emitter.onNext("what");
                emitter.onNext("do");
                emitter.onNext("you");
                emitter.onNext("ask");
                emitter.onNext("me");
                emitter.onComplete();
            }
        });

②、创建观察者--接收数据

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                //d.dispose();//用于销毁资源
                Log.d("onSubscribe-onNext", "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d("onSubscribe-onNext", s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d("onSubscribe-onError", e.toString());
            }

            @Override
            public void onComplete() {
                Log.d("onSubscribe-onComplete", "onComplete");
            }
        };

③、产生订阅关系

        observable.subscribe(observer);

测试结果:

01-11 19:24:58.810 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: onSubscribe
01-11 19:24:58.810 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: what
01-11 19:24:58.811 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: do
01-11 19:24:58.811 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: you
01-11 19:24:58.811 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: ask
01-11 19:24:58.812 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: me
01-11 19:24:58.812 30211-30211/com.hzy.androidrxjava D/onSubscribe-onComplete: onComplete

RxJava原理

Observable.create()方法

    /**
     * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style 
     * 提供一个API 通过被观察者将反应式世界与回调式世界连接起来。
     */
    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));//调用hook关联方法;
    }
    /**
     * Calls the associated hook function.
     * 此处将source返回通过Observable<T>去接收
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

ObservableOnSubscribe是一个接口,定义了subscribe()方法,用来接收ObservableEmitter的实例,以安全,可取消的方式发送事件;

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

ObservableEmitter是Emitter(发射器)的子类;ObservableEmitter有序的发送onNext(),onError(),onComplete()方法;通过serialize()方法确保按照你想要的顺序发送;

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    @NonNull
    ObservableEmitter<T> serialize();
    boolean tryOnError(@NonNull Throwable t);
}

public interface Emitter<T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

Observer是一个接口,为接收基于推的通知提供了一种机制,在observable.subscribe(observer)之后,最先调用onSubscribe(),然后是数量不等的onNext(),最后仅调用一次onComplete() 或者onError(),这两个相互冲突,只有一个执行;在onSubscribe可以销毁资源,调用d.dispose();调用之后,后面的方法不再执行;

public interface Observer<T> {
  void onSubscribe(@NonNull Disposable d);
  void onNext(@NonNull T t);
  void onError(@NonNull Throwable e);
  void onComplete();
}

subscribe()中的subscribeActual(observer)在Observable是抽象方法,实际的订阅方法;操作符应该实现执行必要业务逻辑的方法,我们看下ObservableCreate类

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);//实际的订阅方法;
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

ObservableCreate是Observable的子类,重写了subscribeActual()方法,

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //CreateEmitter是静态匿名内部类,实现了ObservableEmitter和Disposable接口;
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //observer调用onSubscribe(),可以销毁资源
    observer.onSubscribe(parent);

    try {
         //执行subscribe()方法
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

CreateEmitter内有一个serialize()方法,确保按照你想要的顺序发送事件,返回了一个SerializedEmitter实例;

    @Override
    public ObservableEmitter<T> serialize() {
        return new SerializedEmitter<T>(this);
    }

SerializedEmitter类连续的调用onNext, onError and onComplete,发送事件;在发送事件之前都会判断emitter是否销毁资源或者是否已经调用了onError或者onComplete;如果已经调用,就不在发送事件;
以下是SerializedEmitter发送事件的逻辑判断:

    SerializedEmitter(ObservableEmitter<T> emitter) {//构造函数
        this.emitter = emitter;
        this.error = new AtomicThrowable();
        this.queue = new SpscLinkedArrayQueue<T>(16);//事件缓存数量为16;
    }

    @Override
    public void onNext(T t) {//SerializedEmitter的onNext()
        if (emitter.isDisposed() || done) {//销毁资源或者done=true(已经执行了onComplete或者)
            return;
        }
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (get() == 0 && compareAndSet(0, 1)) {// 调用Native方法compareAndSet,执行CAS操作
            emitter.onNext(t);//发送onNext()方法
            if (decrementAndGet() == 0) {
                return;
            }
        } else {
            SimpleQueue<T> q = queue;
            synchronized (q) {
                q.offer(t);//offer添加一个元素并返回true如果队列已满,则返回false
            }
            if (getAndIncrement() != 0) {
                return;
            }
        }
        drainLoop();
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {//onError根据tryOnError的返回值
        if (emitter.isDisposed() || done) {//如果已经销毁子资源或者done= true;
            return false;
        }
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (error.addThrowable(t)) {//抛出异常
            done = true;
            drain();//drain()内部调用drainLoop()方法
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (emitter.isDisposed() || done) {//资源销毁或者done= true
            return;
        }
        done = true;
        drain();
    }

    void drain() {
        if (getAndIncrement() == 0) {
            drainLoop();
        }
    }

    void drainLoop() {//drainLoop()方法,内部双层无限循环,
        ObservableEmitter<T> e = emitter;
        SpscLinkedArrayQueue<T> q = queue;
        AtomicThrowable error = this.error;
        int missed = 1;
        for (;;) {

            for (;;) {
                if (e.isDisposed()) {//如果已经销毁资源,清空队列,退出循环
                    q.clear();
                    return;
                }

                if (error.get() != null) {//error不为null,队列清空,调用onError()方法,退出循环
                    q.clear();
                    e.onError(error.terminate());
                    return;
                }

                boolean d = done;
                T v = q.poll();//poll 移除并返回队列头部的元素    如果队列为空,则返回null

                boolean empty = v == null;

                if (d && empty) {//如果done= true;并且当前事件为nul,调用onComplete()方法,退出循环
                    e.onComplete();
                    return;
                }

                if (empty) {//当前事件为null;
                    break;
                }

                e.onNext(v);//调用onNext()
            }

            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }

参考文章
https://gank.io/post/560e15be2dca930e00da1083
https://www.jianshu.com/p/a406b94f3188
https://blog.csdn.net/firelion0725/article/details/51093512
https://github.com/rengwuxian/RxJavaSamples
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
https://github.com/nanchen2251/RxJava2Examples
https://blog.csdn.net/fengluoye2012/article/details/79149201

相关文章

网友评论

    本文标题:Android RxJava基本使用及原理

    本文链接:https://www.haomeiwen.com/subject/oisxdqtx.html