美文网首页
RxJava2 学习笔记(一)

RxJava2 学习笔记(一)

作者: 爱学习的小宝宝 | 来源:发表于2017-11-18 03:08 被阅读0次

简单的写一点自己理解的源码分析
最简例1:

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //e 就是包装后的观察者对象,请往后看
                e.onNext("1");
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(String value) {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

响应式
-Observable 被观察者
-Observer 观察者/订阅者
Observable.create创建了一个Observable对象,参数是ObservableOnSubscribe接口的实现,并在Observable里保存ObservableOnSubscribe的引用source。这个引用对象为方法,所以使用接口形式实现。

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//非空判断
        //onAssembly hook相关可忽略
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
//创建的ObservableCreate就是一个Observable对象实现了subscribeActual(Observer<? super T> observer)方法
public final class ObservableCreate<T> extends Observable<T>{
    final ObservableOnSubscribe<T> source;//源引用
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {//重要!!!订阅时执行
       // 包装observer 当传入observer时CreateEmitter开始发送数据
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);//onSubscribe(Disposable d)  CreateEmitter也实现了Disposable 
        try {
            //重要!!!使观察者和被观察者相关联
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }}

CreateEmitter实现了ObservableEmitter 译为被观察者的发射器,当被观察者订阅观察者的时候(以代码流程直译的,应该理解为观察者订阅被观察者)也就是Observable.subscribe的时候,源source会调用subscribe将订阅的观察者observer作为参数传进去,使观察者和被观察者建立关系,此时数据开始传递。

//装饰类包装observer,添加了isDisposed标识是否可用相关的东西
//实现了ObservableEmitter,所以可以作为ObservableOnSubscribe的参数。
 static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onError(Throwable t) {
              //先调用onError ,onComplete不会被执行
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
        @Override
        public void onComplete() {
            //先调用onComplete 再调用onError 会crash
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    }
//作为Observable.create的参数
public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
//是一个装饰接口,继承Emitter,在源码里包装了一次Observer
public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    ObservableEmitter<T> serialize();
    boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

调用Observable.subscribe(Observer<? super T> observer) 完成订阅

 public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        subscribeActual(observer);
      
    }

例2:Observable.map(Function<? super T, ? extends R> mapper)

//将String变换成Integer
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete();
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer value) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });;

首先看map方法干了什么

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //不用想ObservableMap也应该是一个Observable对象
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
//AbstractObservableWithUpstream继承Observable 里面只有一个成员ObservableSource<T> source的引用
//Observable 实现了ObservableSource所以这2个可以看成是一个东西,这个引用就是保存上游的Observable 引用
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        //MapObserver为Observer即订阅者 t:下游的Observer | function:map()参数
        source.subscribe(new MapObserver<T, U>(t, function));
    }
}

ObservableMap的subscribeActual()
参考上面的简单例子Observable .subscribe参数为Observer即订阅者,所以MapObserver肯定实现了Observer,那么source.subscribe这个方法调用的时候就会调用上游源Observable 的subscribeActual()方法,将MapObserver包装成ObservableEmitter,那么当调用e.onNext("1")的时候,即调用了MapObserver.onNext("1"); 看下MapObserver的onNext做了什么

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);//actual为下游的Observer引用
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            U v;
            try {
                //mapper 就是map()里传入的Function接口的实现。实现变换
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //下游Observer继续执行onNext
            actual.onNext(v);
        }
    }

Function 接口

public interface Function<T, R> {
    //很简单,给我一个T还你一个R  
    R apply(@NonNull T t) throws Exception;
}

到此,源Observable 和 ObservableMap已经做好准备,ObservableMap的MapObserver也已经实现,就等最下游Observable .subscribe调用了,调用后反向执行Observable 的subscribeActual()方法,然后数据正向开始传递。
之前见有人说Observable 在订阅前的每一个点(.)都是创建了一个Observable 对象。这么看来还是很贴切的,rx的各种变换和线程切换也就不难理解了。具体后面看完再做总结。

相关文章

  • RxJava2笔记(五、订阅流程梳理以及线程切换次数有效性)

    在前面的几篇文章RxJava2笔记(一、事件订阅流程)RxJava2笔记(二、事件取消流程)RxJava2笔记(三...

  • Android Develop——RxJava2(二) RxJa

    在RxJava2(一)教程中,已经跟着大神们学习了RxJava2的基本使用,现在我们来学习一下RxJava2很强大...

  • Rxjava2学习笔记(一)

    网上大佬们都整理得很好了,只是老看,总觉得差点啥,所以,就想着自己再捋一遍,加深印象。给 Android 开发者的...

  • RxJava2 学习笔记(一)

    简单的写一点自己理解的源码分析最简例1: 响应式-Observable 被观察者-Observer 观察者...

  • RxJava2笔记(三、订阅线程切换)

    在前面两篇文章RxJava2笔记(一、事件订阅流程)和RxJava2笔记(二、事件取消流程)中,我们分别了解了事件...

  • RxJava2 学习笔记

    函数响应式编程 函数响应式编程的思维是将问题抽象为数据加工,一切问题都是数据源发出数据的问题,所以用函数响应式编程...

  • RxJava2学习笔记

    intro "森林里的一棵树倒下来,如果周围没有人听见,那么就等于说树的倒下是寂静无声的." 随着产品功能的增加,...

  • rxjava2 学习笔记

    特点 链式调用 线程切换 操作符 创建操作符 转换操作符 过滤操作符 组合操作符 错误处理操作符 辅助性操作符 条...

  • RxJava2学习笔记

    本文参考RxJava2 只看这一篇文章就够了,强烈推荐大家去看一下。 RxJava的组成 被观察者-------O...

  • RxJava2学习笔记

    我们为什么选择RxJava Infinite Streams 无限流 Asynchronous execution...

网友评论

      本文标题:RxJava2 学习笔记(一)

      本文链接:https://www.haomeiwen.com/subject/zhgsvxtx.html