美文网首页
Rxjava2.2.1(2) create-map-subscr

Rxjava2.2.1(2) create-map-subscr

作者: 其勇勇 | 来源:发表于2019-08-07 11:21 被阅读0次

rxjava代码

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("有情况");
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) throws Exception {
        if("有情况".equals(s)){
            return 70;
        }
        return 10;
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.e("rxjava","危险等级" + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

上面代码我们可以做个设定,比如被观察者是个间谍,他发现有情况,然后向观察者报信,但是如果直接报信太危险,于是乎他们密码本,把发现的情况转变成密码再给观察者,为了更方便看代码,我把rxjava的内部类给抽出来

ObservableOnSubscribe<String> observableOnSubscribe = new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("有情况");
    }
};
Function<String,Integer> function = new Function<String, Integer>() {
    @Override
    public Integer apply(String s) throws Exception {
        if("有情况".equals(s)){
            return 70;
        }
        return 10;
    }
};
Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.e("rxjava","危险等级" + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};
Observable.create(observableOnSubscribe).map(function).subscribe(observer);

角色

observableOnSubscribe :被观察者、间谍、有情况报信
function :密码本,负责把间谍信息转换成密码(这里把string转换成integer)
observer : 观察者,负责拿到密码

好,下面开始看代码

1、首先看Observable.create方法

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //就是判空
    ObjectHelper.requireNonNull(source, "source is null");
    //这个地方返回Observable本身
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

2、再看RxJavaPlugins.onAssembly方法

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    //这个地方onObservableAssembly我没有做过赋值,所以为空,所以原样返回source
    return source;
}

3、那么就直接看1步骤里的new ObservableCreate<T>(source)),进入代码

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

这里发现就是做一个赋值操作,把我们创建的被观察者给了source
(其实1、2、3步骤照抄上一篇文章:https://www.jianshu.com/p/9e0a3ee4e0b3
4、再看Observable的map方法

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //这里原样返回 new ObservableMap<T, R>(this, mapper)
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

首先看这里返回的是ObservableMap,这里的角色要分清楚,即Observable.create(observableOnSubscribe).map(function).subscribe(observer);中调用subscribe方法的不再是create的那个observable,而是这个new的ObservableMap,这里创建ObservableMap传的入参,第一个this就是发送String的Observal,mapper就是密码本,就是我们自己new的那个function ,负责把String转变成integer。
我们进入它的构造函数

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);
    this.function = function;
}

再进入这个super(source)

AbstractObservableWithUpstream(ObservableSource<T> source) {
    this.source = source;
}

我们发现,这个地方就是个赋值的操作,最后ObservableMap拥有了发送String数据的Observable的引用,同时还拥有了转换器(密码本也行,就是我们定义的那个function)的引用,自此,map的代码我们告一段落

5、我们再看Observable.create(observableOnSubscribe).map(function).subscribe(observer)中的subscribe方法,我们上面说了,因为map返回的是我们new的ObservableMap对象,所以这里调用subscribe方法的是这个ObservableMap,我们进入ObservableMap中的subscribe方法(其实还是进入了它的父类Observable的subscribe方法中)。

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

从代码中我们看到,还是调用抽象方法subscribeActual,
6、这个时候再回到ObservableMap的subscribeActual方法

public void subscribeActual(Observer<? super U> t) {
   source.subscribe(new MapObserver<T, U>(t, function));
}

好,这个地方注意了,我们先看他们的角色
参数t:就是我们自己new的那个 Integer 的观察者
source:就是我们步骤4最后面赋值的那个source,就是Observable.create的那个Observable,它里面放的被观察者发送的数据是String
function:转换器(也就是我们自己说的那个密码本)
然后看这里的操作
source调用它的subscribe方法,直接放了一个new的MapObserver对象,我们进入它的构造方法

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
    super(actual);
    this.mapper = mapper;
}

再看一下super(actual);

public BasicFuseableObserver(Observer<? super R> downstream) {
    this.downstream = downstream;
}

到这里为止,我们发现,我们自己new的那个 Integer 的观察者,赋值给了downstream,我们自己写的转换器(密码本、function)赋值给了mapper
好,以上相当于初始化工作都已经结束啦,下面再把发送数据的流程代码说一遍
7、这个时候,我们的间谍,用于发送String数据的被观察者发送了数据emitter.onNext("有情况")
我们进入onNext的方法里

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

看,其实还是调用了observer.onNext(t),我们需要清楚的是,这个observer是谁??我们看看这个observer是怎么来的
首先CreateEmitter的构造函数传进来的,那又是哪里创建的CreateEmitter对象呢,是在 ObservableCreate 的 subscribeActual 方法里,而哪里调用了这个 subscribeActual 方法呢?我们知道subscribeActual 是抽象函数,是在subscribe里调用的,那哪里调用了这个 subscribe方法呢?其实前面已经说过了,就在步骤6那里,不错,就是这个source.subscribe(new MapObserver<T, U>(t, function))
好了,饶了这么一大圈,其实emitter.onNext("有情况"),然后调用了observer.onNext(t),这个observer就是我们上面new的那个MapObserver
好,我们此时进入MapObserver的onNext方法

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }

    U v;

    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}

看关键代码
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.")
这里需要注意,t是间谍发送的“有情况”字符串
mapper.apply()就是我们自己定义的转换器(密码本,就是那个function)
然后这里就把这个String数据转换成了v,v就是我们观察者需要的integer呀
再看关键代码
downstream.onNext(v);
这里的downstream是谁??看步骤6的最后,这个downstream就是我们自己new的那个Integer的观察者呀,然后它就拿到信息了呀
一套流程打完收工!!

相关文章

  • Rxjava2.2.1(2) create-map-subscr

    rxjava代码 上面代码我们可以做个设定,比如被观察者是个间谍,他发现有情况,然后向观察者报信,但是如果直接报信...

  • Rxjava2.2.1(3) subscribeOn 线程切换-

    rxjava代码 由前面两篇文章我们得知,如果不指定线程切换,那么我们在哪个线程操作,事件的发送和接收就发生在哪个...

  • Rxjava2.2.1(1) create-subscribe源

    rxjava 代码 首先角色观察者:observer被观察者:observableOnSubscribe这个代码其...

  • Rxjava2.2.1(4) observeOn 线程切换-源码

    rxjava代码 然后create和subscribe也不讲了(可以看前面文章)1、直接看observeOn 再进...

  • DAY 2(2/2)

    五彩滩声名在外,但是我们去的时候在休整,我们十分不甘心,根据各种攻略告诉我们在景区出口有村民守着问你要不要去五彩滩...

  • 2-2-2

    自由写作群 转化与蜕变 继续刚才的梦的后记 我想梦是用最形象的比喻告诉我内在正在经历着发生着什么,这是潜意识里已经...

  • 2 (2)

    突然想到Jenny ,那个有些神经质的女孩儿。 对我来说,Jenny 给我最深的印象是作家。作为一个作家,她的灵感...

  • 2-2-2 RelativeLayout

    标注:本文为个人整理,仅做自己学习参考使用,请勿转载和转发2018-06-03: 初稿,参考博主coder-pig...

  • 2️⃣0️⃣2️⃣0️⃣🔚🔜2️⃣0️⃣2️⃣1️⃣

    今天风小了,夕阳很平静,但2020年终究是不平静的一年。 不平静的2020年,第一次有了一张小区出入证。不能飞去热...

  • 2-2

    ❤️起步,️️(若起步的右车道前方无车,可以不用转到左车道; 转发了右车道一定要变更车道) 一段车程 ❤️右转,右...

网友评论

      本文标题:Rxjava2.2.1(2) create-map-subscr

      本文链接:https://www.haomeiwen.com/subject/tubndctx.html