美文网首页
RxJava初学

RxJava初学

作者: 耿之伟 | 来源:发表于2020-03-19 17:08 被阅读0次

    Rxjava简介

    Reactive Extensions for the JVM
    a library for composing asynchronous and event-based programs using observable sequences for the Java VM
    这里说了Rxjava就是JVM响应式扩展Reactive Extensions
    用于使用Java VM的可观察序列编写异步与基于事件的程序库(sdk)。

    使用观察者模式,采用链式编程,基于事件的实现异步的库

    Rxjava的简单使用

    首先创建被观察者(Observable)

    Observable navel = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter e) throws Exception {
                    e.onNext("hello,rxjava");
                    e.onNext("认识一下 Rxjava");
                    e.onComplete();
                }
            });
    

    我们这里看下Observable.create的源码:

    /**
         * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
         * <p>
         * Example:
         * <pre><code>
         * Observable.&lt;Event&gt;create(emitter -&gt; {
         *     Callback listener = new Callback() {
         *         &#64;Override
         *         public void onEvent(Event e) {
         *             emitter.onNext(e);
         *             if (e.isLast()) {
         *                 emitter.onComplete();
         *             }
         *         }
         *
         *         &#64;Override
         *         public void onFailure(Exception e) {
         *             emitter.onError(e);
         *         }
         *     };
         *
         *     AutoCloseable c = api.someMethod(listener);
         *
         *     emitter.setCancellable(c::close);
         *
         * });
         * </code></pre>
         * <p>
         * <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png" alt="">
         * <p>
         * You should call the ObservableEmitter's onNext, onError and onComplete methods in a serialized fashion. The
         * rest of its methods are thread-safe.
         * <dl>
         *  <dt><b>Scheduler:</b></dt>
         *  <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
         * </dl>
         *
         * @param <T> the element type
         * @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
         * @return the new Observable instance
         * @see ObservableOnSubscribe
         * @see ObservableEmitter
         * @see Cancellable
         */
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    ObservableOnSubscribe<T>,可以理解为一个计划表,泛型T是要操作对象的类型

    /**
     * A functional interface that has a {@code subscribe()} method that receives
     * an instance of an {@link ObservableEmitter} instance that allows pushing
     * events in a cancellation-safe manner.
     *
     * @param <T> the value type pushed
     */
    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param e the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(ObservableEmitter<T> e) throws Exception;
    }
    

    ObservableOnSubscribe只需要实现一个接口subscribe,参数是ObservableEmitter<T>;看源码可以知道ObservableEmitter继承了Emitter
    那我们看下Emitter的接口

    /**
     * Base interface for emitting signals in a push-fashion in various generator-like source
     * operators (create, generate).
     *
     * @param <T> the value type emitted
     */
    public interface Emitter<T> {
    
        /**
         * Signal a normal value.
         * @param value the value to signal, not null
         */
        void onNext(T value);
    
        /**
         * Signal a Throwable exception.
         * @param error the Throwable to signal, not null
         */
        void onError(Throwable error);
    
        /**
         * Signal a completion.
         */
        void onComplete();
    }
    

    Emitter发射器可以看到只有三个接口void onNext(T value)、void onError(Throwable error)、void onComplete()。
    onNext方法可以无限调用,Observer(观察者)所有的都能接收到,onError和onComplete是互斥的,Observer(观察者)只能接收到一个,OnComplete可以重复调用,但是Observer(观察者)只会接收一次,而onError不可以重复调用,第二次调用就会报异常。

    第二步:创建观察者

    Observer reader = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    //gengwei 这里说明一下这个Disposable;这个对象就是观察者和被观察者的关系,
                    //如果观察者Observer不想继续订阅被观察者了,就可以主动取消掉
                    Log.e("gengwei","onSubscribe");
                }
    
                @Override
                public void onNext(String value) {
                    Log.e("gengwei","onNext"+value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("gengwei","onError");
                }
    
                @Override
                public void onComplete() {
                    Log.e("gengwei","onComplete");
                }
            };
    

    观察者通过new创建,可以看到有四个方法需要实现。onNext、onError、onComplete都是跟被观察者发射的方法一一对应的,这里就相当于接收了,这三个很好理解。关于onSubscribe这个方法需要关注下,我代码里备注了。

    第三步:创建观察者关系

    navel.subscribe(reader);
    

    只需要一行代码搞定。
    打log看下输出:

    2020-03-19 15:41:32.229 10250-10250/com.example.myapplication E/gengwei: onSubscribe
    2020-03-19 15:41:32.230 10250-10250/com.example.myapplication E/gengwei: onNexthello,rxjava
    2020-03-19 15:41:32.230 10250-10250/com.example.myapplication E/gengwei: onNext认识一下 Rxjava
    2020-03-19 15:41:32.230 10250-10250/com.example.myapplication E/gengwei: onComplete
    

    这就是Rxjava最简单的使用方法

    Rxjava的异步和链式编程

    前面说过Rxjava是支持异步链式编程;我们先看下链式编程。
    我们可以看下Observable<T>这个被观察者的API
    这里列举部分,有兴趣可以去看下源码接口:

    /**
         * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
         * <p>
         * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
         * <dl>
         *  <dt><b>Scheduler:</b></dt>
         *  <dd>You specify which {@link Scheduler} this operator will use</dd>
         * </dl>
         *
         * @param scheduler
         *            the {@link Scheduler} to perform subscription actions on
         * @return the source ObservableSource modified so that its subscriptions happen on the
         *         specified {@link Scheduler}
         * @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
         * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
         * @see #observeOn
         */
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> fromArray(T... items) {
            ObjectHelper.requireNonNull(items, "items is null");
            if (items.length == 0) {
                return empty();
            } else
            if (items.length == 1) {
                return just(items[0]);
            }
            return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
        }
    
     /**
         * Returns an Observable that emits a single item and then completes.
         * <p>
         * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/just.png" alt="">
         * <p>
         * To convert any object into an ObservableSource that emits that object, pass that object into the {@code just}
         * method.
         * <p>
         * This is similar to the {@link #fromArray(java.lang.Object[])} method, except that {@code from} will convert
         * an {@link Iterable} object into an ObservableSource that emits each of the items in the Iterable, one at a
         * time, while the {@code just} method converts an Iterable into an ObservableSource that emits the entire
         * Iterable as a single item.
         * <dl>
         *  <dt><b>Scheduler:</b></dt>
         *  <dd>{@code just} does not operate by default on a particular {@link Scheduler}.</dd>
         * </dl>
         *
         * @param item
         *            the item to emit
         * @param <T>
         *            the type of that item
         * @return an Observable that emits {@code value} as a single item and then completes
         * @see <a href="http://reactivex.io/documentation/operators/just.html">ReactiveX operators documentation: Just</a>
         */
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> just(T item) {
            ObjectHelper.requireNonNull(item, "The item is null");
            return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
        }
    ...
    

    这里只是列举几个Rxjava的几个API,但是可以清晰的看到每一个方法都会再返回一个Observable<T>;这样的话就能形成一套简洁的链式代码。

    下边再看一下Rxjava的异步编程是如何实现的,这里就要用到Scheduler(调度器),它是RxJava用来控制线程。

    我们把刚刚的demo使用链式异步再写一下:

    Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter e) throws Exception {
                    e.onNext("hello,rxjava");
                    //gengwei,这里增加休眠3s
                    Thread.sleep(3000);
                    e.onNext("认识一下 Rxjava");
                    e.onComplete();
                }
            })
                    .observeOn(AndroidSchedulers.mainThread()) //回调在mainThread
                    .subscribeOn(Schedulers.io())//执行在io线程
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            //gengwei 这里说明一下这个Disposable;这个对象就是观察者和被观察者的关系,
                            //如果观察者Observer不想继续订阅被观察者了,就可以主动取消掉
                            Log.e("gengwei","onSubscribe");
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.e("gengwei","onNext"+value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("gengwei","onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("gengwei","onComplete");
                        }
                    });
    

    这里就是RxJava最常用的写法,异步+链式编程,还要再说一下,subscribe的方法重载,subscribe()方法里什么参数也不放是空实现,也就是说连载小说无论出什么连载,读者都不关心,推送过来了也不读,如果读者只关心onNext方法里的内容,可以直接重载subscribe(Consumer<? spuer T> onNext)这个方法,会减少代码,当然如果是初学者还是建议创建Observer对象。
    看下输出


    image.png

    注意看下箭头的打印时间,可以看出这里sleep了3s。

    好了RxJava的初学就到这里,下一篇继续RxJava的进阶。

    相关文章

      网友评论

          本文标题:RxJava初学

          本文链接:https://www.haomeiwen.com/subject/khsmdhtx.html