美文网首页
RxJava源码分析

RxJava源码分析

作者: yangweigbh | 来源:发表于2017-02-18 13:17 被阅读26次

    从创建一个数组的Observable说起:

    public static <T> Observable<T> from(T[] array) {
        ......
        return create(new OnSubscribeFromArray<T>(array));
    }
    
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f)); //RxJavaHooks提供了改写OnSubscribe的钩子,可以认为直接返回参数
    }
    
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    

    OnSubsribeFromArray:

    public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
        final T[] array;
        public OnSubscribeFromArray(T[] array) {
            this.array = array;
        }
    
        @Override
        public void call(Subscriber<? super T> child) {
            child.setProducer(new FromArrayProducer<T>(child, array));
        }
    

    当调用Observable的subscribe时,会创建一个ObserverSubscribe包装Observer

        public final Subscription subscribe(final Observer<? super T> observer) {
            if (observer instanceof Subscriber) {
                return subscribe((Subscriber<? super T>)observer);
            }
            if (observer == null) {
                throw new NullPointerException("observer is null");
            }
            return subscribe(new ObserverSubscriber<T>(observer));
        }
    
        public final Subscription subscribe(Subscriber<? super T> subscriber) {
            return Observable.subscribe(subscriber, this);
        }
    
        static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        
            // new Subscriber so onStart it
            subscriber.onStart();
    
            /*
             * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
             * to user code from within an Observer"
             */
            // if not already wrapped
            if (!(subscriber instanceof SafeSubscriber)) {
                // assign to `observer` so we return the protected version
                subscriber = new SafeSubscriber<T>(subscriber);
            }
    
            // The code below is exactly the same an unsafeSubscribe but not used because it would
            // add a significant depth to already huge call stacks.
            try {
                // allow the hook to intercept and/or decorate
                observable.onSubscribe.call(subscriber);
                return subscriber;
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // in case the subscriber can't listen to exceptions anymore
                if (subscriber.isUnsubscribed()) {
                    RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
                } else {
                    // if an unhandled error occurs executing the onSubscribe we will propagate it
                    try {
                        subscriber.onError(RxJavaHooks.onObservableError(e));
                    } catch (Throwable e2) {
                        Exceptions.throwIfFatal(e2);
                        // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                        // so we are unable to propagate the error correctly and will just throw
                        RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                        // TODO could the hook be the cause of the error in the on error handling.
                        RxJavaHooks.onObservableError(r);
                        // TODO why aren't we throwing the hook's return value.
                        throw r; // NOPMD
                    }
                }
                return Subscriptions.unsubscribed();
            }
        }
    

    显示调用subscriber的onStart,然后用SafeSubscriber包装subscriber,SafeSubscriber是为了保证OnError和OnComplete只会执行其中之一,并且执行后不再执行onNext

    接着调用OnSubscribeFromArray.call(subscriber)

        @Override
        public void call(Subscriber<? super T> child) {
            child.setProducer(new FromArrayProducer<T>(child, array));
        }
    
        static final class FromArrayProducer<T>
        extends AtomicLong
        implements Producer {
            /** */
            private static final long serialVersionUID = 3534218984725836979L;
    
            final Subscriber<? super T> child;
            final T[] array;
    
            int index;
    
            public FromArrayProducer(Subscriber<? super T> child, T[] array) {
                this.child = child;
                this.array = array;
            }
    
            @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
    
            void fastPath() {
                final Subscriber<? super T> child = this.child;
    
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
    
                    child.onNext(t);
                }
    
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
    
            void slowPath(long r) {
                final Subscriber<? super T> child = this.child;
                final T[] array = this.array;
                final int n = array.length;
    
                long e = 0L;
                int i = index;
    
                for (;;) {
    
                    while (r != 0L && i != n) {
                        if (child.isUnsubscribed()) {
                            return;
                        }
    
                        child.onNext(array[i]);
    
                        i++;
    
                        if (i == n) {
                            if (!child.isUnsubscribed()) {
                                child.onCompleted();
                            }
                            return;
                        }
    
                        r--;
                        e--;
                    }
    
                    r = get() + e;
    
                    if (r == 0L) {
                        index = i;
                        r = addAndGet(e);
                        if (r == 0L) {
                            return;
                        }
                        e = 0L;
                    }
                }
            }
        }
    

    Subscriber.setProducer:如果包装了其他subscriber,则转交给其他subscriber,如果没有包装subscriber,没有调用request,会调用p.request(Long.MAX_VALUE)。如果之前调用了request,则调用p.request(n),n为累积的数量。

        public void setProducer(Producer p) {
            long toRequest;
            boolean passToSubscriber = false;
            synchronized (this) {
                toRequest = requested;
                producer = p;
                if (subscriber != null) {
                    // middle operator ... we pass through unless a request has been made
                    if (toRequest == NOT_SET) {
                        // we pass through to the next producer as nothing has been requested
                        passToSubscriber = true;
                    }
                }
            }
            // do after releasing lock
            if (passToSubscriber) {
                subscriber.setProducer(producer);
            } else {
                // we execute the request with whatever has been requested (or Long.MAX_VALUE)
                if (toRequest == NOT_SET) {
                    producer.request(Long.MAX_VALUE);
                } else {
                    producer.request(toRequest);
                }
            }
        }
    

    FromArrayProducer.request(n)

            @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
    

    如果n 为 Long.MAX_VALUE,如果之前没有请求过,则调用fastPath

    如果n不为0,则说明之前调用了request,则要支持背压,调用slowPath

            void fastPath() {
                final Subscriber<? super T> child = this.child;
    
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
    
                    child.onNext(t);
                }
    
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
    

    只要没有unsubscribe,会对数组中每个元素调用Observer的onNext,然后调用Observer.onCompleted()

    Scheduler

    通过subscribeOn和observeOn可以改变操作符执行的线程

    subscribeOn只能调用一次,表示subscribe操作所在的线程,第一次ObserveOn之前所以操作都在subscribeOn所指定的线程

    observeOn可以调用多次,调用之后所有的操作都在observeOn指定的线程上进行操作

    subscribeOn,创建一个OperatorSubscribeOn将原来的Observable包装起来。

        public final Observable<T> subscribeOn(Scheduler scheduler) {
            if (this instanceof ScalarSynchronousObservable) {
                return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
            }
            return create(new OperatorSubscribeOn<T>(this, scheduler));
        }
    
    public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
    
        final Scheduler scheduler;
        final Observable<T> source;
    
        public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
            this.scheduler = scheduler;
            this.source = source;
        }
    
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            final Worker inner = scheduler.createWorker();
            subscriber.add(inner);
    
            inner.schedule(new Action0() {
                @Override
                public void call() {
                    final Thread t = Thread.currentThread();
    
                    Subscriber<T> s = new Subscriber<T>(subscriber) {
                        @Override
                        public void onNext(T t) {
                            subscriber.onNext(t);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            try {
                                subscriber.onError(e);
                            } finally {
                                inner.unsubscribe();
                            }
                        }
    
                        @Override
                        public void onCompleted() {
                            try {
                                subscriber.onCompleted();
                            } finally {
                                inner.unsubscribe();
                            }
                        }
    
                        @Override
                        public void setProducer(final Producer p) {
                            subscriber.setProducer(new Producer() {
                                @Override
                                public void request(final long n) {
                                    if (t == Thread.currentThread()) {
                                        p.request(n);
                                    } else {
                                        inner.schedule(new Action0() {
                                            @Override
                                            public void call() {
                                                p.request(n);
                                            }
                                        });
                                    }
                                }
                            });
                        }
                    };
    
                    source.unsafeSubscribe(s);
                }
            });
        }
    }
    

    从scheduler中create一个worker,调用worker的schedule执行包装Observable的subscribe方法。而且保证Producer的request方法在worker线程上调用。

    RxJava中的Scheduler有

    • NewThreadScheduler,NewThreadWorker(包装了一个ScheduledExecutorService)
    • ComputationScheduler:EventLoopsScheduler,根据cpu数量确定worker的数量,EventLoopWorker包装了PoolWorker(实际是NewThreadWorker)
    • IOScheduler: CachedThreadScheduler,ThreadWorker: unsubscribe的ThreadWorker可以重用,长期不用的会被清理。

    Map操作符

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return create(new OnSubscribeMap<T, R>(this, func));
    }
    

    创建一个OnSubscribeMap包装map函数和原Observable。

    public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    
        final Observable<T> source;
    
        final Func1<? super T, ? extends R> transformer;
    
        public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
            this.source = source;
            this.transformer = transformer;
        }
    
        @Override
        public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            source.unsafeSubscribe(parent);
        }
    

    subscribe时会生成一个MapSubscriber包装原来subscriber。

        static final class MapSubscriber<T, R> extends Subscriber<T> {
    
            final Subscriber<? super R> actual;
    
            final Func1<? super T, ? extends R> mapper;
    
            boolean done;
    
            public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
                this.actual = actual;
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                R result;
    
                try {
                    result = mapper.call(t);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    unsubscribe();
                    onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                    return;
                }
    
                actual.onNext(result);
            }
    
            @Override
            public void onError(Throwable e) {
                if (done) {
                    RxJavaHooks.onError(e);
                    return;
                }
                done = true;
    
                actual.onError(e);
            }
    
    
            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                actual.onCompleted();
            }
    
            @Override
            public void setProducer(Producer p) {
                actual.setProducer(p);
            }
        }
    

    当MapSubscriber的onNext方法被调用时,先用map函数转换一下,然后传给包装subscriber的onNext

    相关文章

      网友评论

          本文标题:RxJava源码分析

          本文链接:https://www.haomeiwen.com/subject/lfxrwttx.html