RxJava源码浅析一:构造数据源

作者: gatsby_dhn | 来源:发表于2016-10-03 15:37 被阅读86次

    接触了一段时间RxJava,对它的原理还是有些模糊,打算看下它的源码。

    支持原创,转载请注明出处。

    RxJava构造数据的方式大概有三种:

    1.create方法

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("参数1");
            subscriber.onNext("参数2");
            subscriber.onCompleted();
        }
    });
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i("dhn", "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(String s) {
            Log.i("dhn", "onNext " + s);
        }
    };
    observable.subscribe(subscriber);
    
    输出:
    onNext 参数1
    onNext 参数2
    onCompleted
    

    看下源码:

        public static <T> Observable<T> create(OnSubscribe<T> f) {
            return new Observable<T>(hook.onCreate(f));
        }
    

    我们创建了一个OnSubscribe对象,传递给create方法。
    hook.onCreate()方法

        public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
            return f;
        }
    

    原封不动返回OnSubscribe对象。

    protected Observable(OnSubscribe<T> f) {    this.onSubscribe = f;}
    

    所以创建完一个Observable会持有传入的OnSubscribe的引用。创建完Observable和OnSubscribe后调用observable.subscribe(subscriber)我们看下subscribe方法。

        public final Subscription subscribe(Subscriber<? super T> subscriber) {
            return Observable.subscribe(subscriber, this);
        }
    
    
        static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
         // validate and proceed
            if (subscriber == null) {
                throw new IllegalArgumentException("subscriber can not be null");
            }
            if (observable.onSubscribe == null) {
                throw new IllegalStateException("onSubscribe function can not be null.");
                /*
                 * the subscribe function can also be overridden but generally that's not the appropriate approach
                 * so I won't mention that in the exception
                 */
            }
            
            // new Subscriber so onStart it
            subscriber.onStart();
            
            /*
             * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
             * to user code from within an Observer"
             */
            // if not already wrapped
            if (!(subscriber instanceof SafeSubscriber)) {
                // assign to `observer` so we return the protected version
                subscriber = new SafeSubscriber<T>(subscriber);
            }
    
            // The code below is exactly the same an unsafeSubscribe but not used because it would 
            // add a significant depth to already huge call stacks.
            try {
                // hook.onSubscribeStart(observable, observable.onSubscribe)返回onSubscribe
                hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
                return hook.onSubscribeReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // in case the subscriber can't listen to exceptions anymore
                if (subscriber.isUnsubscribed()) {
                    RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
                } else {
                    // if an unhandled error occurs executing the onSubscribe we will propagate it
                    try {
                        subscriber.onError(hook.onSubscribeError(e));
                    } catch (Throwable e2) {
                        Exceptions.throwIfFatal(e2);
                        // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                        // so we are unable to propagate the error correctly and will just throw
                        RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                        // TODO could the hook be the cause of the error in the on error handling.
                        hook.onSubscribeError(r);
                        // TODO why aren't we throwing the hook's return value.
                        throw r;
                    }
                }
                return Subscriptions.unsubscribed();
            }
        }
    

    hook.onSubscribeStart(observable, observable.onSubscribe)返回onSubscribe,Onsubscribe是我们自己定义的我们看下它的call方法:

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("参数1");
            subscriber.onNext("参数2");
            subscriber.onCompleted();
        }
    });
    

    依次调用传入的subscriber的onNext,onNext和onCompleted方法。由此我们知道,OnSubscribe这个类控制着发送什么数据发送数据的次序

    2. from方法

    String[] array = new String[]{"参数1", "参数2"};
    Observable<String> observable = Observable.from(array);
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i("dhn", "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(String s) {
            Log.i("dhn", "onNext " + s);
        }
    };
    observable.subscribe(subscriber);
    
    输出:
    onNext 参数1
    onNext 参数2
    onCompleted
    

    我们看下from这个方法:

       public static <T> Observable<T> from(T[] array) {
           int n = array.length;
           if (n == 0) {
               return empty();
           } else
           if (n == 1) {
               return just(array[0]);
           }
           //这里用传入的数组构造一个OnSubscribeFromArray对象,这是一个OnSubscribe
           return create(new OnSubscribeFromArray<T>(array));
       }
    

    这里用传入的数组构造一个OnSubscribeFromArray对象,这是一个OnSubscribe,然后调用create方法,咦,又回到了第一种方法。我们看下OnSubscribeFromArray这个类。

    public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
        final T[] array;
        public OnSubscribeFromArray(T[] array) {
            this.array = array;
        }
        
        @Override
        public void call(Subscriber<? super T> child) {
            child.setProducer(new FromArrayProducer<T>(child, array));
        }
        
        static final class FromArrayProducer<T>
        extends AtomicLong
        implements Producer {
            /** */
            private static final long serialVersionUID = 3534218984725836979L;
            
            final Subscriber<? super T> child;
            final T[] array;
            
            int index;
            
            public FromArrayProducer(Subscriber<? super T> child, T[] array) {
                this.child = child;
                this.array = array;
            }
            
            @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
            
            void fastPath() {
                final Subscriber<? super T> child = this.child;
                
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
                    
                    child.onNext(t);
                }
                
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
            
            void slowPath(long r) {
                final Subscriber<? super T> child = this.child;
                final T[] array = this.array;
                final int n = array.length;
                
                long e = 0L;
                int i = index;
    
                for (;;) {
                    
                    while (r != 0L && i != n) {
                        if (child.isUnsubscribed()) {
                            return;
                        }
                        
                        child.onNext(array[i]);
                        
                        i++;
                        
                        if (i == n) {
                            if (!child.isUnsubscribed()) {
                                child.onCompleted();
                            }
                            return;
                        }
                        
                        r--;
                        e--;
                    }
                    
                    r = get() + e;
                    
                    if (r == 0L) {
                        index = i;
                        r = addAndGet(e);
                        if (r == 0L) {
                            return;
                        }
                        e = 0L;
                    }
                }
            }
        }
    }
    

    这个类的call方法

        @Override
        public void call(Subscriber<? super T> child) {
            child.setProducer(new FromArrayProducer<T>(child, array));
        }
    

    创建了一个FromArrayProducer,然后传给Subscriber的setProducer方法。

        public void setProducer(Producer p) {
            long toRequest;
            boolean passToSubscriber = false;
            synchronized (this) {
                toRequest = requested;
                producer = p;
                if (subscriber != null) {
                    // middle operator ... we pass through unless a request has been made
                    if (toRequest == NOT_SET) {
                        // we pass through to the next producer as nothing has been requested
                        passToSubscriber = true;
                    }
                }
            }
            // do after releasing lock
            if (passToSubscriber) {
                subscriber.setProducer(producer);
            } else {
                // we execute the request with whatever has been requested (or Long.MAX_VALUE)
                if (toRequest == NOT_SET) {
                    producer.request(Long.MAX_VALUE);
                } else {
                    producer.request(toRequest);
                }
            }
        }
    

    这个方法调用传入的FromArrayProducer的request方法,我们看下这个方法:

            @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
            
            void fastPath() {
                final Subscriber<? super T> child = this.child;
                
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
                    
                    child.onNext(t);
                }
                
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
            
            void slowPath(long r) {
                final Subscriber<? super T> child = this.child;
                final T[] array = this.array;
                final int n = array.length;
                
                long e = 0L;
                int i = index;
    
                for (;;) {
                    
                    while (r != 0L && i != n) {
                        if (child.isUnsubscribed()) {
                            return;
                        }
                        
                        child.onNext(array[i]);
                        
                        i++;
                        
                        if (i == n) {
                            if (!child.isUnsubscribed()) {
                                child.onCompleted();
                            }
                            return;
                        }
                        
                        r--;
                        e--;
                    }
                    
                    r = get() + e;
                    
                    if (r == 0L) {
                        index = i;
                        r = addAndGet(e);
                        if (r == 0L) {
                            return;
                        }
                        e = 0L;
                    }
                }
            }
        }
    

    slowPath方法中,依次遍历数组中每个元素,作为Subscriber.onNext的参数,遍历结束后调用Subscriber.onCompleted方法。和我们预期的一样。

    3.just方法

    Observable<String> observable = Observable.just("参数1", "参数2");
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i("dhn", "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(String s) {
            Log.i("dhn", "onNext " + s);
        }
    };
    observable.subscribe(subscriber);
    
    输出:
    onNext 参数1
    onNext 参数2
    onCompleted
    

    我们看下just方法:

        public static <T> Observable<T> just(T t1, T t2) {
            return from((T[])new Object[] { t1, t2 });
        }
    

    很简单,将传入的参数组成数组然后调用from方法,又回到了方法2。

    总结

    本文探讨了RxJava中常见的创建数据源的方法,数据变化的部分后续探讨。

    后面会将更多笔记整理成博客,欢迎关注。
    支持原创,转载请注明出处。

    相关文章

      网友评论

        本文标题:RxJava源码浅析一:构造数据源

        本文链接:https://www.haomeiwen.com/subject/owqjyttx.html