美文网首页
Rxjava-源码浅尝

Rxjava-源码浅尝

作者: Lei_9c47 | 来源:发表于2018-07-23 16:10 被阅读0次
        //被观察者
        Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("Jenchar");
                subscriber.onCompleted();
            }
        });
        //观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.w(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.w(TAG, "onError");
            }

            @Override
            public void onNext(String s) {
                Log.w(TAG, "onNext");
            }
        };
        //订阅
        stringObservable.subscribe(observer);

通过Observable的create静态方法传入OnSubscribe的实例,OnSubscribe继承Action1并且实现了其call方法,
传入的泛型为OnSubscribe<String>的泛型,Action1<T>中传入了Subscriber<T>,泛型作为T被层层传递

public interface Action1<T> extends Action {
    void call(T t);
}

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
}

从Action1<Subscriber<? super T>>这个接口可以看出泛型已经指定为Subscriber<? super T>,这个Subscriber泛型怎么传递进来的呢?

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
     static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }

        // new Subscriber so onStart it
        subscriber.onStart();

        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
          //省略
         }
            return Subscriptions.unsubscribed();
        }
    }

以上这段源码为链式结构最后的执行方法,可以看到subscriber最终是通过 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)传入,让我们看看这个方法里面怎么操作

    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }

最终返回的还是onSubscribe对象,再调用继承了Action1的call方法传入了该subscriber指定了泛型,形成了观察者的传递,subscriber实现Observer接口。

public interface Observer<T> {

    void onCompleted();

  
    void onError(Throwable e);

   
    void onNext(T t);

}

public abstract class Subscriber<T> implements Observer<T>, Subscription 
以下代码省略

还有一点值得注意,在RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);之前还会将subscriber转成SafeSubscriber,这一步的主要作用就是保证onComplete和onError方法只执行一次,并且在onNext方法中如果trycatch到不在判断范围内的异常不会抛出而是会进行捕捉调用onError终止

 @Override
    public void onNext(T t) {
        try {
            if (!done) {
                actual.onNext(t);
            }
        } catch (Throwable e) {
            // we handle here instead of another method so we don't add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwOrReport(e, this);
        }
    }
    
 public static void throwOrReport(Throwable t, Observer<?> o) {
        Exceptions.throwIfFatal(t);
        o.onError(t);
    }

 public static void throwIfFatal(Throwable t) {
        if (t instanceof OnErrorNotImplementedException) {
            throw (OnErrorNotImplementedException) t;
        } else if (t instanceof OnErrorFailedException) {
            throw (OnErrorFailedException) t;
        } else if (t instanceof OnCompletedFailedException) {
            throw (OnCompletedFailedException) t;
        }
        // values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
        else if (t instanceof VirtualMachineError) {
            throw (VirtualMachineError) t;
        } else if (t instanceof ThreadDeath) {
            throw (ThreadDeath) t;
        } else if (t instanceof LinkageError) {
            throw (LinkageError) t;
        }
    }

相关文章

  • Rxjava-源码浅尝

    通过Observable的create静态方法传入OnSubscribe的实例,OnSubscribe继承Acti...

  • rxjava目录

    RxJava学习笔记: 1、RxJava->onCreate()与subscribe()2、RxJava->doO...

  • Android函数响应式编程最新RxJava-操作符入门(2)

    Android函数响应式编程最新RxJava-基本用法Android函数响应式编程最新RxJava-操作符入门(1...

  • 浅尝lerna version源码

    lerna version主要的工作为标识出在上一个 tag 版本以来更新的 monorepo package,然...

  • RxJava-基本订阅流程源码分析

    前言:学习了这么多天的RxJava系列文章,虽然会用了,但是确不懂的具体是怎么回事,所以说会用的话还是不行,要去了...

  • Android函数响应式编程最新RxJava-操作符入门(1)

    Android函数响应式编程最新RxJava-基本用法 Rxjava操作符包括创建操作符、变换操作符、过滤操作符、...

  • 浅尝

    昨天夜里,失眠,拿起手机看了电影《七月与安生》,看完,便在沉沉的思虑中渐渐睡去 于是我也便就更喜欢改编版的电影里安...

  • 浅尝

    格子间里的人们终于在夜幕的催促下离开了白昼的战场,又急匆匆的淹没在华灯初上的车水马龙中。长长来路打翻万般思绪,风拂...

  • 浅尝

    今天在那刷网课,我妈过说这课挺有意思的,你认真听。突然想起我爸也说过类似的话。他们好像总和我不在一个频道上。 其实...

  • 浅尝

    从开始动手练习画画后,线描,水彩都尝试了,这两天听一个免费的国画体验课,因为太忙,只能放着听听,偶尔誊出手来截个屏...

网友评论

      本文标题:Rxjava-源码浅尝

      本文链接:https://www.haomeiwen.com/subject/rnsjmftx.html