美文网首页
RxJava源码之create,初窥门道

RxJava源码之create,初窥门道

作者: 丑人林宗己 | 来源:发表于2019-03-24 22:53 被阅读0次

    简单学习分析RxJava的源码,预计会写三篇

    • create
    • Schedulers.newThread()
    • zip

    create

    关于坊间对于RxJava的定义都略有耳闻,除了其响亮的旗号响应式编程之外,大概所知至多应是其基于常见设计模式之观察者模式的框架设计理念。作为一枚Java工程师,理应知道JDK库内置了观察者模式的实现API接口,至于二者之间有何区别不妨自行了解,本文仅关注RxJava的具体实现思路。

    在阅读源码之前,先写一个简单的Demo做为实践是一件极其必要的事,若不先知其然就欲知其所以然,就有些主次颠倒。不过不要紧,我这里提供了一个极其简单的Demo可供运行,但是Demo的运行过程也不仅仅只在运行,我们还要将其中涉及到的关键类找出来,并通过IDE找到顶层接口设计。从顶层接口的设计辅之以Demo的运行思路即可俯瞰该模块的设计思路。

    Observable.create(new ObservableOnSubscribe<String>() {
    
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    System.out.println("thread name : " + Thread.currentThread().getName());
                    emitter.onNext("ABC");
                }
            }).subscribe(new Observer<String>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    
                }
    
                @Override
                public void onNext(String t) {
                    System.out.println("thread name : " + Thread.currentThread().getName());
                    System.out.println("create string value : " + t);
                }
    
                @Override
                public void onError(Throwable e) {
                    
                }
    
                @Override
                public void onComplete() {
                    
                }
            });
    

    顶级接口设计

    找到顶级接口,是完成源码阅读的首要任务,顶层接口可帮助我们俯瞰整体架构设计。

    Observable

    可观察类,我认为可以理解为被观察者,查阅其注释如下:

    /**
     * The Observable class is the non-backpressured, optionally multi-valued base reactive class that
     * offers factory methods, intermediate operators and the ability to consume synchronous
     * and/or asynchronous reactive dataflows.
     * ....
    **/
    public abstract class Observable<T> implements ObservableSource<T> {
      //...
    }
    

    在注释中完全可以提取到很关键的信息:非背压,提供工厂方法、操作符以及同步异步的响应式数据流操作能力,也就是说它的根本能力应该是提供我们常见的关键API入口。同时,它实现了一个接口ObservableSource

    ObservableSource

    字面理解,可观察源,查阅注释如下:

    /**
     * Represents a basic, non-backpressured {@link Observable} source base interface,
     * consumable via an {@link Observer}.
     *
     * @param <T> the element type
     * @since 2.0
     */
    public interface ObservableSource<T> {
    
        /**
         * Subscribes the given Observer to this ObservableSource instance.
         * @param observer the Observer, not null
         * @throws NullPointerException if {@code observer} is null
         */
        void subscribe(@NonNull Observer<? super T> observer);
    }
    
    

    在注释中完全可以提取到很关键的信息:非背压,Observable source 顶级接口,可供观察者消费,而该接口只有一个待实现的方法:subscribe(@NonNull Observer<? super T> observer),该方法的入参为Observer,意为观察者,那么它的存在就是订阅(链接)观察者。在观察者模式中,被观察者在观察者的监禁下存活,一引而动,其中最为重要的应当二者的桥梁。

    ObservableOnSubscribe

    从字面上很难理解该接口的意义,但是辅之以我们的Demo,以及该接口的注释即可理解该类的作用。

    /**
     * A functional interface that has a {@code subscribe()} method that receives
     * an instance of an {@link ObservableEmitter} instance that allows pushing
     * events in a cancellation-safe manner.
     *
     * @param <T> the value type pushed
     */
    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param emitter the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    

    在注释中可以看到它的意义在于推送事件,而入参是一个ObservableEmitter对象,意为发射器,结合我们的Demo实践结果,可以猜测该接口实现用以推送事件,即触发观察者响应事件。

    ObservableEmitter

    扩展了Emitter接口,意为发射器,暂时不看ObservableEmitter接口扩展的几个待实现函数,关键看扩展父接口Emitter的几个待实现函数

    /**
     * Base interface for emitting signals in a push-fashion in various generator-like source
     * operators (create, generate).
     *
     * @param <T> the value type emitted
     */
    public interface Emitter<T> {
    
        /**
         * Signal a normal value.
         * @param value the value to signal, not null
         */
        void onNext(@NonNull T value); // 发送数据
    
        /**
         * Signal a Throwable exception.
         * @param error the Throwable to signal, not null
         */
        void onError(@NonNull Throwable error);// 触发异常机制
    
        /**
         * Signal a completion.
         */
        void onComplete();// 完成事件,可以认为该观察者事件完结
    }
    

    Observer

    字面理解,观察者。查阅注释如下:

    /**
     * Provides a mechanism for receiving push-based notifications.
     * ....
     */
    public interface Observer<T> {}
    

    在注释中可以看到,它的意思是响应Emitter发射器的推送事件的顶级接口。

    细节

    可以说,我们基本上已经俯瞰了总的设计图,大致的方向其实较为明朗,但是它们是如何穿针引线的达到目的的,可能还会有一些迷糊。通过IDE来查看源码,按照目前的了解,再辅之以实现细节,即可更加全面的了解其中的设计路线。

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    一个很关键的类出现了:ObservableCreate,它的类意义可以拆分为Observable, Create。命名方案贯穿整个RxJava框架,也是我们可以参考学习的地方。从字面上大致可以看出它或许隐藏着create关键字非常具体,细节的实现方案。

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) { // 该接口的subscribe的入参是发射器
            this.source = source;
        }
    
        // 这里为啥不是调用的subscribe?可以往上看,该方法实质是模板方法,父类调用
        // Observable#subscribe(Observer<? super T> observer);
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);// 构造create的发射器
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);  // ObservableOnSubscribe调用了入参的发射器,回看Demo
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
    // ....
    }
    

    回来看到Demo, ObservableOnSubscribe的实现细节。

    System.out.println("thread name : " + Thread.currentThread().getName());
    emitter.onNext("ABC"); // 发射器的onNext()意为发送数据
    

    发射出了事件,如何响应观察者对象呢?回去看ObservableCreate.CreateEmitter类的源码。

     public final class ObservableCreate<T> extends Observable<T> {
        // .....
        static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    
            /// ...
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
               /// ...
               observer.onNext(t); // 观察者的onNext()方法。入参为t,发射器发射出去的数据
            }
            /// ....
            @Override
            public void onComplete() {
               /// ... 
              observer.onComplete(); // 观察者的onComplete()
            }
        }
    }
    

    如此,可算脉络清晰了,辅以简单的类结构图,望能更加清晰的展现create关键字的设计思路。

    类结构图.png

    总结

    你有get到你想要的吗?

    个人认为,查阅源码在于学习其设计的精髓,我很少会去深挖框架很细节的东西,当然,排查bug除外。阅读源代码仅个人爱好,它可能无法很直接的帮助我把业务代码写的更好,但是很具有借鉴意义。

    下一篇会学习RxJava中关于异步的实现方案,以及整体的设计思路。

    相关文章

      网友评论

          本文标题:RxJava源码之create,初窥门道

          本文链接:https://www.haomeiwen.com/subject/kmspvqtx.html