美文网首页
RxJava(十二)--subscribe()订阅解析

RxJava(十二)--subscribe()订阅解析

作者: azu_test | 来源:发表于2019-03-08 14:50 被阅读0次

介绍

只有调用了subscribe()方法,才能将Observable跟Observer关联起来,并触发Observable内处理器的执行。

执行代码

         //初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
        Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("杨");
                subscriber.onNext("月");
                subscriber.onCompleted();
            }
        });
         //初始化观察者Observer,视作结果接收器
        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };
        //订阅
        observable.subscribe(observer);

源码分析

1. 初始化被观察者Observable
  Observable  observable = Observable.create(数据处理器);

接下来会进入Observable#create方法

    Observable#create
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

RxJavaHooks.onCreate(f)此处不做分析了,知道最终返回的还是数据处理器即可

接下来会进入真正的初始化方法

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

由此可知被观察者Observable持有数据处理器对象Observable.OnSubscribe。

2. 初始化结果接受器观察者Observer
        Observer observer = new Observer<String>() {
           ...
        }
3. 订阅
        observable.subscribe(observer);

接下来回进入

    Observable#subscribe
    public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        return subscribe(new ObserverSubscriber<T>(observer));
    }
    Observable#subscribe
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        //通知观察者做准备工作
        subscriber.onStart();
        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
        try {
            //获取数据处理器Observable.OnSubscribe,并做数据处理工作
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    RxJavaHooks.onObservableError(r);
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

上面方法的主要功能代码均已标注说明。RxJavaHooks.onObservableStart()方法是用来获取当前被观察者的数据执行器。然后调用数据处理器的call()方法,此方法就是外部用户自己实现的方法。call()方法传递的参数就是结果接收器观察者Observer。

            数据处理器内的cal()l方法
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("杨");
                subscriber.onNext("月");
                subscriber.onCompleted();
            }

上面方法中的subscriber就是结果接受器Observer,经过上面执行onNext()``onCompleted()方法会进入对应的Observer的方法内。

接着就会进入结果接收器Observer内方法体内

        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                LogShowUtil.addLog("RxJava","结束",true);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };

最终输出结果

结果: 杨
结果: 月
结束

相关文章

  • RxJava(十二)--subscribe()订阅解析

    介绍 只有调用了subscribe()方法,才能将Observable跟Observer关联起来,并触发Obser...

  • 10. RxJava

    RxJava Observable: 被观察者Observer: 观察者subscribe: 订阅observe...

  • 三大框架原理

    RxJava原理可总结为: 被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 ...

  • (二)RxJava笔记1

    RxJava有四个基本概念 被观察者 Observable 观察者 Observer 订阅 Subscribe ...

  • Redis 命令操作

    第一,发布订阅 1,subscribe - 订阅 命令:subscribe redisChat 2,publish...

  • RxLifecycle的替代方案

    RxJava 的 Observable subscribe() 后会返回 Subscription(RxJava2...

  • Rxjava2.x源码解析(二): 线程切换

    上一篇文章Rxjava2.x源码解析(一): 订阅流程中我们讲了 RxJava2 的订阅部分的源码。但 RxJav...

  • rxjava目录

    RxJava学习笔记: 1、RxJava->onCreate()与subscribe()2、RxJava->doO...

  • RxJava2.x关于subscribe()和subscribe

    由于在RxJava在2.x以上版本,api改动还是比较大的.其中订阅时有两个Api : subscribe和sub...

  • RxJava线程切换原理

    RxJava通过责任链的方式,将各个 操作符 节点串连起来。当调用订阅subscribe方法时,链上节点都会依赖订...

网友评论

      本文标题:RxJava(十二)--subscribe()订阅解析

      本文链接:https://www.haomeiwen.com/subject/ykyrpqtx.html