美文网首页Android-Rxjava&retrofit&dagger
谜之RxJava (一) —— 最基本的观察者模式

谜之RxJava (一) —— 最基本的观察者模式

作者: Gemini | 来源:发表于2015-11-27 11:40 被阅读176次

    最近在Android界,最火的framework大概就是RxJava了。
    扔物线大大之前写了一篇文章 《给 Android 开发者的 RxJava 详解》,在我学习RxJava的过程中受益匪浅。经过阅读这篇文章后,我们来看下RxJava的源码,揭开它神秘的面纱。

    这里准备分几篇文章写,为了能让自己有个喘口气的机会。

    先来上个最最简单的,经典的Demo。

    Demo

    Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello");
        }
    }).subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(String s) {
            Log.d("rx", s);
        }
    });
    

    这段代码产生的最终结果就是在Log里会出现hello。

    看下这段代码的具体流程吧。
    这里有2个函数createsubscribe,我们看看create里面看了啥。

    OnSubscribe对象

    public final static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }
    // constructor
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    

    这里的hook是一个默认实现,里面不做任何事,就是返回f。我们看见create只是给ObservableonSubscribe赋值了我们定义的OnSubscribe

    Subscriber对象

    来看下subscribe这个函数做了什么事

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }
        
        // new Subscriber so onStart it
        subscriber.onStart();
        
        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }
    
        // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (OnErrorNotImplementedException e2) {
                // special handling when onError is not implemented ... we just rethrow
                throw e2;
            } catch (Throwable e2) {
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }
    

    我们看到,这里我们的subscriberSafeSubscriber包裹了一层。

    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    

    然后开始执行工作流

    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
    return hook.onSubscribeReturn(subscriber);
    

    默认的hook只是返回我们之前定义的onSubscribe,这里调用的call方法就是我们在外面定义的

    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello");
        }
    })
    

    我们调用传入的subscriber对象的onNext方法,这里的subscriberSafeSubscriber
    SafeScriber

    public void onNext(T args) {
        try {
            if (!done) {
                actual.onNext(args);
            }
        } catch (Throwable e) {
            // we handle here instead of another method so we don't add stacks to the frame
            // which can prevent it from being able to handle StackOverflow
            Exceptions.throwIfFatal(e);
            // handle errors if the onNext implementation fails, not just if the Observable fails
            onError(e);
        }
    }
    

    actual就是我们自己定义的subscriber。 原来SafeSubscriber只是为了帮我们处理好异常,以及防止工作流的重复。

    这是RxJava最最基本的工作流,让我们认识到他是怎么工作的。之后我们来讲讲其中的细节和其他神奇的内容。

    【谜之RxJava (二) —— Magic Lift】

    欢迎关注我Github 以及 weibo@Gemini

    相关文章

      网友评论

        本文标题:谜之RxJava (一) —— 最基本的观察者模式

        本文链接:https://www.haomeiwen.com/subject/xvudhttx.html