美文网首页
RxJava2 源码分析(二)

RxJava2 源码分析(二)

作者: 徘徊0_ | 来源:发表于2019-08-08 18:02 被阅读0次

    简述:简单分析Rxjava2 常见操作符的源码

    range操作符原理

    range是一个生产操作符,例如下面的示例代码,发送0~5给下游:

    Disposable disposable1 = Observable.range(0, 5).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept: t" + integer);
                }
            });
    

    生产操作符主要是在range实现,具体看一下:

      //规定了类型为Integer
    public static Observable<Integer> range(final int start, final int count) {
            //一些判断操作
            if (count < 0) {
                throw new IllegalArgumentException("count >= 0 required but it was " + count);
            }
            if (count == 0) {
                return empty();
            }
            if (count == 1) {
                return just(start);
            }
            if ((long)start + (count - 1) > Integer.MAX_VALUE) {
                throw new IllegalArgumentException("Integer overflow");
            }
            //主要的方法
            return RxJavaPlugins.onAssembly(new ObservableRange(start, count));
        }
    
    • 返回值类型为:Observable<Integer>
    • 主要的方法是:new ObservableRange(start, count)

    继续看一下ObservableRange,类不是太长主要包含RangeDisposable静态内部类和subscribeActual();方法:

    public final class ObservableRange extends Observable<Integer> {
        private final int start;
        private final long end;
    
        public ObservableRange(int start, int count) {
            this.start = start;
            this.end = (long)start + count;
        }
    
        //将观察者传入
        @Override
        protected void subscribeActual(Observer<? super Integer> o) {
            RangeDisposable parent = new RangeDisposable(o, start, end);
            o.onSubscribe(parent); // 观察者跟Disposable 关联,可以截断从上游到下游的数据
            parent.run();
        }
    
        static final class RangeDisposable
        extends BasicIntQueueDisposable<Integer> {
    
            private static final long serialVersionUID = 396518478098735504L;
    
            final Observer<? super Integer> downstream;
    
            final long end;
    
            long index;
    
            boolean fused;
    
            RangeDisposable(Observer<? super Integer> actual, long start, long end) {
                this.downstream = actual;
                this.index = start;
                this.end = end;
            }
    
            void run() {
                if (fused) {
                    return;
                }
                Observer<? super Integer> actual = this.downstream;
                long e = end;
                for (long i = index; i != e && get() == 0; i++) {
                    actual.onNext((int)i);
                }
                if (get() == 0) {
                    lazySet(1);
                    actual.onComplete();
                }
            }
    
            @Nullable
            @Override
            public Integer poll() throws Exception {
                long i = index;
                if (i != end) {
                    index = i + 1;
                    return (int)i;
                }
                lazySet(1);
                return null;
            }
    
            @Override
            public boolean isEmpty() {
                return index == end;
            }
    
            @Override
            public void clear() {
                index = end;
                lazySet(1);
            }
    
            @Override
            public void dispose() {
                set(1);
            }
    
            @Override
            public boolean isDisposed() {
                return get() != 0;
            }
    
            @Override
            public int requestFusion(int mode) {
                if ((mode & SYNC) != 0) {
                    fused = true;
                    return SYNC;
                }
                return NONE;
            }
        }
    }
    
    • subscribeActual 方法中,将参数跟需要发送的内容关联起来,最后调用了RangeDisposable.run();方法。

    • 上面代码可以看出,将o、start、endo为Observer作为参数传到了RangeDisposable,这里需要注意,RangeDisposable.run()方法,用了一个for循环,依次调用onNext,将int发送给下游

          void run() {
                if (fused) {
                    return;
                }
                Observer<? super Integer> actual = this.downstream;
                long e = end;
                // 这里for循环,调用观察者的onNext,依次发送 int,
                for (long i = index; i != e && get() == 0; i++) {
                    actual.onNext((int)i);
                }
                if (get() == 0) {
                    lazySet(1);
                    actual.onComplete();
                }
            }
    
    Map操作符原理

    map是转换操作符,例如下面的代码,将String类型转换为Integer类型:

    Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    //发送 String 类型
                    emitter.onNext("1");
                }
            }).map(new Function<String, Integer>() {
                @Override
                public Integer apply(String s) throws Exception {
                    //String 类型 转换为 Integer
                    return Integer.valueOf(s);
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept: " + integer);
                }
            });
    

    转换操作,是在map这里实现,跟一下代码:

        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
             //判空操作
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            // 重要方法 
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    

    注意:

    • 这里两个泛型的类型为:StringInteger
    • 这个方法返回值是Observable,实际上返回的是new ObservableMap<T, R>(this, mapper),主要的实现还是在ObservableMap这个类中
    ObservableMap 分析

    ObservableMap类中包含一个静态内部类:MapObserver 具体如下:

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
    
        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;
    
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    downstream.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                downstream.onNext(v);
            }
    
            @Override
            public int requestFusion(int mode) {
                return transitiveBoundaryFusion(mode);
            }
    
            @Nullable
            @Override
            public U poll() throws Exception {
                T t = qd.poll();
                return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
            }
        }
    }
    
    • ObservableMap构造方法,将ObservableSourceFunction传入,其中Function泛型分别为T : StringU: Integer
    • subscribeActual 上一篇中分析过,被观察者订阅观察者的时候,调用的是subscribeActual这个方法。
    • MapObserver 具体的转换类,具体分析一下onNext(T t);
            @Override
            public void onNext(T t) { // 这个T = String ,代表转换前的类型
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    downstream.onNext(null);
                    return;
                }
    
                U v; // U: Integer ,指的是需要转换的类型
    
                /**
                 *具体的转换方法是:mapper.apply(t);,将t转换为v类型
                 */
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                //将转换好的v ,调用onNext()发送给下游,从而实现了类型转换
                downstream.onNext(v);
            }
    

    注意

    • 上面的Function是一个接口
    public interface Function<T, R> {
        /**
         * Apply some calculation to the input value and return some other value.
         * @param t the input value
         * @return the output value
         * @throws Exception on error
         */
        R apply(@NonNull T t) throws Exception; 
    }
    

    所以,上面调用mapper.apply(t)其实是找具体实现的apply方法(这里apply返回值类型R其实就是Integer)

    其实最开始代码已经具体实现了Function方法:

    Function具体实现.png

    到这里,也就完成了一次类型转换,将String 转换成了 Integer。

    总结:

    通过上面的2个操作符的分析,其实都是对Observable进行包装/变换,例如下面的两个操作符(可以猜想其它操作符应该也是差不多的流程):
    1,range使用new ObservableRange(start, count);Observable进行包装处理。
    2,map 使用new ObservableMap<T, R>(this, mapper)Observable进行包装处理。

    相关文章

      网友评论

          本文标题:RxJava2 源码分析(二)

          本文链接:https://www.haomeiwen.com/subject/upxfjctx.html