美文网首页
RxJava原理分析

RxJava原理分析

作者: 雷涛赛文 | 来源:发表于2020-09-11 10:59 被阅读0次

          之前看有的项目上使用到了RxJava,最近比较系统的学习了一下,将学习收获整理一下。

    一.RxJava简介

    a.定义
    定义 作用 特点
    一个基于事件流、实现
    异步操作的库
    实现异步操作
    类似与AyncTask、Handler的使用
    逻辑简洁
    实现优雅:基于事件流的链式调用
    使用简单:随着程序的复杂,依然保持简洁优雅
    b.思路

           被观察者 (Observable) 通过订阅(Subscribe)按顺序发送事件给观察者(Observer)
           观察者(Observer)按顺序接收事件&作出对应的响应动作。

    二.使用流程

    a:创建被观察者(Observable)&定义需发送的事件
    Observable<String> obervable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("事件1");
            e.onNext("事件2");
            e.onNext("事件3");
        }
    });
    
    b:创建观察者(Observer) & 定义响应事件的行为
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            //该方法最先调用
        }
    
        @Override
        public void onNext(String value) {
           //事件接收
           Log.e(TAG, "value is: " + value);
        }
    
        @Override
        public void onError(Throwable e) {
           //异常回调
        }
    
        @Override
        public void onComplete() {
          //事件发送完毕回调
        }
    }   
    

           从方法名字可以得到:事件的接收处理是在onNext()里面,异常处理是在onError()里面,事件发送完毕处理是在onComplete()里面,事件的最早触发是在onSubscribe()里面[后面源码会分析到];

    c:通过订阅(subscribe)连接观察者和被观察者
    observable.subscribe(observer);
    

           Observable的subscribe()具备多个重载的方法,可以灵活运用,来实现自己的需求;

    //表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
    public final Disposable subscribe() {}
    // 表示观察者只对被观察者发送的Next事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    // 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    // 表示观察者对被观察者发送的任何事件都作出响应(一般采用此种方式)
    public final void subscribe(Observer<? super T> observer) {}
    

           通过subscribe传入Consumer实例时,执行对应事件的响应回调都是在accept里面进行处理;

    /**
     * A functional interface (callback) that accepts a single value.
     * @param <T> the value type
     */
    public interface Consumer<T> {
        /**
         * Consume the given value.
         * @param t the value
         * @throws Exception on error
         */
        void accept(T t) throws Exception;
    }
    

           输出以下结果:

    value is: 事件1
    value is: 事件2
    value is: 事件3
    

           以上就完成了基于事件流的链式调用过程,接下来通过源码来分析一下详细工作过程:

    三.源码分析

    a:创建被观察者(Observable)& 定义需发送的事件
    //调用create时创建Observable
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        //最终是创建了ObservableCreate类对象 
        //将创建的ObservableOnSubscribe对象传入 
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

           通过以上可以看到,通过create()方法,创建了ObservableCreate对象,然后将ObservableOnSubscribe变量source作为参数传入。
           接下来看一下ObservableCreate的实现:

    //ObservableCreate类,继承Observable类
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        //复写了subscribeActual()
        //作用:订阅时,通过接口回调调用Observerable与Observer的方法
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //先调用oberver的onSubscribe方法
            //印证了上述onSubscribe先调用的猜想
            observer.onSubscribe(parent);
    
            try {
                //调用创建的ObservableOnSubscribe对象的subscribe方法
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
        //执行e.onNext("事件1")方法
        static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
            //observer是订阅前创建的Observer
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                //发送的事件不可为空
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                //若无断开连接(未调用Disposable.dispose())
                if (!isDisposed()) {
                    //调用Observer的onNext()方法
                    observer.onNext(t);
                }
            }
    
            // onError()及onComplete()方法调用后都会调用dispose();就不会发送onNext()事件了
            // 印证了Observer在收到onError()或onComplete()后,就不会再收到onNext()了
            @Override
            public void onError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                } else {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    
            @Override
            public void setDisposable(Disposable d) {
                DisposableHelper.set(this, d);
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
        }
    

           通过源码可以看到在执行Observable.create()实际上是创建了一个ObservableCreate对象,并将创建的ObservableOnSubscribe对象作为参数传入,复写了subscribeActual()方法,在方法内创建了携带Observer的CreateEmitter对象,并分别回调了Observer的onSubscribe,ObservableOnSubscribe的subscribe方法。在create时,仅仅是定义,即:subscribeActual()此时还未被回调。

    b:创建观察者(Observer) & 定义响应事件的行为
    //Observer.java
    public interface Observer<T> {
        //Observer是一个接口
        // 接口内含4个方法,分别用于响应对应于被观察者发送的不同事件
        void onSubscribe(Disposable d); // 内部参数:Disposable 对象,可结束事件
        void onNext(T value);
        void onError(Throwable e);
        void onComplete();
    }
    
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            //该方法最先调用
        }
    
        @Override
        public void onNext(String value) {
           //事件接收
           Log.e(TAG, "value is: " + value);
        }
    
        @Override
        public void onError(Throwable e) {
           //异常回调
        }
    
        @Override
        public void onComplete() {
          //事件发送完毕回调
        }
    }   
    
    c:通过订阅(subscribe)连接观察者和被观察者
    observable.subscribe(observer);
    //Observable.java
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //subscribeActual属于抽象方法,由子类实现;即由创建Observable时创建的ObservableCreate类对象
            //即在调用subscribe时,实际上是调用了创建Observable时创建的ObservableCreate类对象里面的subscribeActual()方法
            //印证了前面所说的在Observable.create()时未执行subscribeActual()
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
        }
    }
    //抽象方法,需要非抽象子类实现
    protected abstract void subscribeActual(Observer<? super T> observer);
    

           从源码可以看到,在执行subscribe()时,最终是调用了ObservableCreate类里面的subscribeActual()方法,在subscribeActual()方法内先是调用了Observer的onSubscribe(),接下来调用source即:ObservableOnSubscribe复写的subscribe()方法,在ObservableOnSubscribe的subscribe()内调用了ObservableEmitter的onNext()、onError()等方法,就开始了事件流的执行。
          总结

    步骤 逻辑实现 源码分析
    创建Observable&定义发送事件 1.调用Observable.create()
    2.创建ObservableOnSubscribe对象
    3.复写subscribe()
    1.创建ObservableCreate类对象
    2.复写subscribeActual()方法
    创建Observer&定义响应事件的行为 1.创建Observer类对象
    2.复写onSubscribe(),onNext(),onError(),onComplete()方法
    1.Observer是一个接口
    2.接口内有四个方法,分别响应Observable发送的不同事件
    subscribe订阅 调用Observable.subscribe(observer) 1.最终调用Observable的子类对象ObservableCreate类的subscribeActual()方法,主要调用如下:
    a.创建CreateEmitter对象;
    b. 调用ObservableOnSubscribe对象复写的subscribe();
    c.调用Observer复写的onSubscribe();
    d.在subscribe()里面调用onNext(),onError(),onComplete()再回调Observer复写的对应方法;

          用一张类图来表示一下类之间的联系

    1.png

    四.线程切换

    observable.subscribeOn(Schedulers.io())//切换到IO线程进行网络请求
                .observeOn(AndroidSchedulers.mainThread())//切换回到主线程 处理请求结果
    

          RxJava2的订阅原理是执行subscribe()时从下往上依次调用Observable的各个子类的subscribeActual()方法,在最上层调用onNext()等方法时,会从上往下依次调用Observer的onNext()等方法,最终会调用app传入的observer的next()等方法。

    a.subscribeOn(Schedulers.io())

          首先subscribeOn(Schedulers.io())最终会调用ObservableSubscribeOn.subscribeActual()方法,内部是将source.subscribe()放到一个Runnable执行,该source就是ObservableCreate(),即会调用到ObservableCreate.subscribeActual(),最终会调用到ObservableOnSubscribe.subscribe(CreateEmitter):

    //Observable.java
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
    //ObservableSubscribeOn.java
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        ......
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
            //切换线程
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    //在新线程里面执行subscribe
                    source.subscribe(parent);
                }
            }));
        }
        ......
    }
    

          Schedulers.io()返回一个IoScheduler,该类继承Scheduler,看一下scheduleDirect(new Runnable())执行了什么操作:

    //IoScheduler.java从Scheduler.java继承
        public Disposable scheduleDirect(Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
    
            return w;
        }
    
    

          通过以上可以看到,在执行scheduleDirect(x,x,x)后,会先执行createWorker(),接着执行w.schedule(Runnable),看一下实现逻辑:

    //IoScheduler.java
        @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    
        static final class EventLoopWorker extends Scheduler.Worker {
            ......
            ......
            @Override
            public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
                ......
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    
    //NewThreadWorker.java
       public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                parent.remove(sr);
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    
    

          可以看到,把传入的Runnable封装成为一个ScheduleRunnable对象。并把这个对象放入线程池中去执行,执行的时候会运行ScheduleRunnable的run方法,最终又会调用ObservableSubscribeOn的run方法,进而调用source.subscribe(),至此subscribeOn()的线程切换就完成了。

    b.observeOn(AndroidSchedulers.mainThread())

          observeOn()时会传入AndroidSchedulers.mainThread(),会创建HandlerScheduler,然后创建Handler,将主线程的Looper传入Handler,后续消息队列都是在主线程执行的,看一下具体逻辑:

    //AndroidSchedulers.java
    public final class AndroidSchedulers {
    
        private static final class MainHolder {
            //创建HandlerScheduler,内部持有主线程Handler的引用
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
        ......
    }
    

          然后在Observable.observeOn()时会创建ObservableObserveOn,并把上述创建的HandlerScheduler传入,先看一下ObservableObserveOn的逻辑实现:

    //ObservableObserveOn.java
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        ......
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
        static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
             implements Observer<T>, Runnable {
            .......
            .......
            @Override
            public void onNext(T t) {
                ........
                //在onNext()中执行schdule()
                schedule();
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    //执行worker的schedule方法,该worker是HandlerScheduler中的HandlerWorker,后面会讲到
                    worker.schedule(this);
                }
            }
    
            void drainNormal() {
                 ......
                 //a就是app传入的Observer
                 a.onNext(v);
                 ......
            }
    
            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    }
    

           当执行subscribe(observer)时,会先调用ObservableObserveOn. subscribeActual(),从上面可以看到:
           会先执行scheduler.createWorker(),这个Worker 对象实际上是在AndroidSchedulers.mainThread()内部的HandlerScheduler中生成的,接下来会讲到;
           然后执行source.subscribe(ObserveOnObserver),该ObserveOnObserver对app传入的observer进行了封装,当最上层调用onNext()等方法后,会最终调用到ObserveOnObserver内部onNext()等方法,从上面逻辑实现可以看到,进而会调用schedule()---->worker.schedule(this)[ObserveOnObserver本身是一个Runnable],该worker就是HandlerWorker,接下来执行到HandlerWorker的schedule(x,x,x),这里面会有一个主线程的Handler对象,然后把特定的线程任务[ObserveOnObserver]通过handler.sendMessageDelayed()方法转移到主线程中去执行,一起看一下HandlerScheduler的实现逻辑:

    final class HandlerScheduler extends Scheduler {
        ........
        @Override
        public Worker createWorker() {
            //该handler是主线程的handler
            return new HandlerWorker(handler);
        }
    
       private static final class HandlerWorker extends Worker {
            ......
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                .......
                run = RxJavaPlugins.onSchedule(run);
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
                .......
                return scheduled;
            }
            ........
        }
    
        private static final class ScheduledRunnable implements Runnable, Disposable {
            .......
    
            @Override
            public void run() {
                try {
                     //该delegate就是ObserveOnObserver[本身就是一个runnable],最终在主线程调用run()
                    delegate.run();
                }
                .......
        }
    

           最后会在主线程里面执行observer的onNext()等方法,以上就是observer线程切换。
           简单总结一下:
           subscribe(observer)后涉及到Observable类的执行顺序:ObservableObserveOn-->ObservableSubscribeOn-->ObservableCreate-->ObservableOnSubscribe.subscribe(CreateEmitter e);
           e.onNext()后涉及到Observer类的执行顺序:CreateEmitter-->SubscribeOnObserver-->ObserveOnObserver-->Observer(app)

    五.实例分析

           由于RxJava是开源的库,要想使用的话需要添加依赖,在AndroidStudio里面添加如下:

    // Android 支持 Rxjava,使用RxJava2的版本
    implementation 'io.reactivex.rxjava2:rxjava:2.0.2'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    
    a.简单事件流处理
    Observable.create(new ObservableOnSubscribe<String>() {
         @Override
         public void subscribe(ObservableEmitter<String> e) throws Exception {
             e.onNext("你好");
             e.onNext("RxJava");
             e.onNext("今天学习一下RxJava");
         }
    }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      //Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
      .subscribe(new Observer<String>() {
             private Disposable mDisposable;
             @Override
             public void onSubscribe(Disposable d) {
                 // 该方法最先调用
                 Log.e(TAG, "-----onSubscribe()------");
                 mDisposable = d;
             }
    
             @Override
             public void onNext(String value) {
                 Log.e(TAG, "-----onNext(): " + value);
                 //1.dispose()后,observer就不再接收后面的消息,即"今天学习一下RxJava"接收不到了
                 if (value.equals("RxJava")) {
                    mDisposable.dispose();
                 }
                //2.在收到onComplete()或onError()之后,就不会回调该方法了
             }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
             public void onComplete() {
                  Log.e(TAG, "-----onComplete()");
             }
    });
    
    /**
      *以下两种方式可以代替e.onNext("你好");e.onNext("RxJava");e.onNext("今天学习一下RxJava");
      */
    //1.Observable observable = Observable.just("你好","RxJava","今天学习一下RxJava");
    //String[] words = {"你好","RxJava","今天学习一下RxJava"};
    //2.Observable observable = Observable.fromArray(words);
    未加dispose()输出结果为:
    -----onNext():你好
    -----onNext():RxJava
    -----onNext():今天学习一下RxJava
    加dispose()输出结果为:
    -----onNext():你好
    -----onNext():RxJava
    
    b.Retrofit+RxJava网络请求

          Retrofit是square开源的网络Restful请求框架,底层是基于okhttp的,开发者只需要定义接口就可以了,Retrofit提供了注解可以表示该接口请求的请求方式、参数、url等。定义好了接口以后,在调用该远程接口的时候直接使用该接口就好像通过RPC方式使用本地类一样方便。
          1.加入依赖

    // Android 支持 Retrofit
    implementation 'com.squareup.retrofit2:retrofit:2.1.0'
    // 衔接 Retrofit & RxJava,要注意使用RxJava2的版本
    implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
    // 支持Gson解析
    implementation 'com.squareup.retrofit2:converter-gson:2.1.0'
    

          2.创建接收服务器返回数据的类
          根据请求返回的Json数据格式,来定义与其对应的类,本实例是参考天气查询,返回的数据格式如下:

    {"data":{"yesterday":{"date":"10日星期四","high":"高温 28℃","fx":"西南风","low":"低温 20℃","fl":"<![CDATA[2级]]>","type":"小雨"},
                          "city":"青岛",
                          "forest":[{"date":"11日星期五","fengli":"<![CDATA[3级]]>","dengxiang":"南风","high":"高温 25℃","low":"低温 21℃","type":"小雨"},{"date":"12日星期六","fengli":"<![CDATA[3级]]>","dengxiang":"东风","high":"高温 25℃","low":"低温 20℃","type":"小雨"},{"date":"13日星期天","fengli":"<![CDATA[2级]]>","dengxiang":"东风","high":"高温 26℃","low":"低温 20℃","type":"阴"},{"date":"14日星期一","fengli":"<![CDATA[2级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 22℃","type":"晴"},{"date":"15日星期二","fengli":"<![CDATA[4级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 20℃","type":"小雨"}],
                          "ganmao":"感冒低发期,天气舒适,请注意多吃蔬菜水果,多喝水哦。","wendu":"22"},
    "status":1000,
    "desc":"OK"}
    

          创建返回数据对应的类WeatherInfo.java

    public class WeatherInfo {
        private DataBean data;
        private int status;
        private String desc;
    
        public DataBean getData() {
            return data;
        }
    
        public void setData(DataBean data) {
            this.data = data;
        }
    
        public int getStatus() {
            return status;
        }
    
        public void setStatus(int status) {
            this.status = status;
        }
    
        public String getDesc() {
            return desc;
        }
    
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        public static class DataBean {
            private YesterdayBean yesterday;
            private String city;
            private String aqi;
            private String ganmao;
            private String wendu;
            private List<ForecastBean> forecast;
    
            public YesterdayBean getYesterday() {
                return yesterday;
            }
    
            public void setYesterday(YesterdayBean yesterday) {
                this.yesterday = yesterday;
            }
    
            public String getCity() {
                return city;
            }
    
            public void setCity(String city) {
                this.city = city;
            }
    
            public String getAqi() {
                return aqi;
            }
    
            public void setAqi(String aqi) {
                this.aqi = aqi;
            }
    
            public String getGanmao() {
                return ganmao;
            }
    
            public void setGanmao(String ganmao) {
                this.ganmao = ganmao;
            }
    
            public String getWendu() {
                return wendu;
            }
    
            public void setWendu(String wendu) {
                this.wendu = wendu;
            }
    
            public List<ForecastBean> getForecast() {
                return forecast;
            }
    
            public void setForecast(List<ForecastBean> forecast) {
                this.forecast = forecast;
            }
    
            public static class YesterdayBean {
    
                private String date;
                private String high;
                private String fx;
                private String low;
                private String fl;
                private String type;
    
                public String getDate() {
                    return date;
                }
    
                public void setDate(String date) {
                    this.date = date;
                }
    
                public String getHigh() {
                    return high;
                }
    
                public void setHigh(String high) {
                    this.high = high;
                }
    
                public String getFx() {
                    return fx;
                }
    
                public void setFx(String fx) {
                    this.fx = fx;
                }
    
                public String getLow() {
                    return low;
                }
    
                public void setLow(String low) {
                    this.low = low;
                }
    
                public String getFl() {
                    return fl;
                }
    
                public void setFl(String fl) {
                    this.fl = fl;
                }
    
                public String getType() {
                    return type;
                }
    
                public void setType(String type) {
                    this.type = type;
                }
    
                public String toString() {
                    return "{" + "\"date\":\"" + getDate() + "\"," + "\"high\":\"" + getHigh() + "\","
                            + "\"fx\":\"" + getFx() + "\"," + "\"low\":\"" + getLow() + "\","
                            + "\"fl\":\"" + getFl() +
                            "\"," + "\"type\":\"" + getType() + "\"}";
                }
            }
    
            public static class ForecastBean {
    
                private String date;
                private String high;
                private String fengli;
                private String low;
                private String fengxiang;
                private String type;
    
                public String getDate() {
                    return date;
                }
    
                public void setDate(String date) {
                    this.date = date;
                }
    
                public String getHigh() {
                    return high;
                }
    
                public void setHigh(String high) {
                    this.high = high;
                }
    
                public String getFengli() {
                    return fengli;
                }
    
                public void setFengli(String fengli) {
                    this.fengli = fengli;
                }
    
                public String getLow() {
                    return low;
                }
    
                public void setLow(String low) {
                    this.low = low;
                }
    
                public String getFengxiang() {
                    return fengxiang;
                }
    
                public void setFengxiang(String fengxiang) {
                    this.fengxiang = fengxiang;
                }
    
                public String getType() {
                    return type;
                }
    
                public void setType(String type) {
                    this.type = type;
                }
    
                public String toString() {
                    return "{" + "\"date\":\"" + getDate() + "\"," + "\"fengli\":\"" + getFengli()
                            + "\"," + "\"dengxiang\":\"" + getFengxiang() + "\"," + "\"high\":\""
                            + getHigh() + "\"," + "\"low\":\"" + getLow() + "\"," + "\"type\":\""
                            + getType() + "\"}";
                }
            }
    
            public String toString() {
                String s = "";
                for (ForecastBean fb : forecast) {
                    s += fb.toString() + ",";
                }
                return "\"data\":" + "{" + "\"yesterday\":" + getYesterday().toString() + "," +
                        "\"city\":\"" + getCity() + "\"," + "\"forest\":" + "[" + s + "]" + "," + 
                        "\"ganmao\":\"" + getGanmao() + "\"," + "\"wendu\":\"" + getWendu() + "\""
                        + "}";
            }
        }
    
        public String toString() {
            return "{" + getData().toString() + "," + "\"status\":" + getStatus() + "," + "\"desc\":\""
                    + getDesc() + "\"" + "}";
        }
    }
    

          3.创建用于描述网络请求的接口RetrofitApi
          采用注解 + Observable<...>接口描述网络请求参数

    public interface RetrofitApi {
    
        /**
         * 注解里传入 网络请求 的部分URL地址
         * Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
         * 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
         * 采用Observable<...>接口
         * getWeatherInfo()是接受网络请求数据的方法
         */
        @GET("weather_mini")
        Observable<WeatherInfo> getWeatherInfo(@Query("city") String city);
       
         //可以直接在GET中加入请求的参数
        @GET("weather_mini?city=青岛")
        Observable<WeatherInfo> getWeatherInfoTwo();
    }
    

          4.创建Retrofit实例及执行请求

    private void getWeatherInfo() {
            //1.创建Retrofit对象
            Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl("http://wthrcdn.etouch.cn/") // 设置网络请求Url
                    .addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(可将得到的Json串转换为对应的WeatherInfo类)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
                    .build();
            //2.创建网络请求接口的实例
            final RetrofitApi request = retrofit.create(RetrofitApi.class);
            //3.采用Observable<...>形式对网络请求进行封装
            Observable<WeatherInfo> observable = request.getWeatherInfo("青岛");
            //4.通过线程切换发送网络请求
            observable.subscribeOn(Schedulers.io())//切换到IO线程进行网络请求
                    .observeOn(AndroidSchedulers.mainThread())//切换回到主线程 处理请求结果
                    .subscribe(new Observer<WeatherInfo>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                        }
    
                        @Override
                        public void onNext(WeatherInfo result) {
                            //5.接收服务器返回的数据
                            Log.e(TAG, "Weather info is: " + result.toString());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "请求失败 : " + e.toString());
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }
    

          请求后onNext()里面打印如下:

    Weather info is: {"data":{"yesterday":{"date":"10日星期四","high":"高温 28℃","fx":"西南风","low":"低温 20℃","fl":"<![CDATA[2级]]>","type":"小雨"},
                          "city":"青岛",
                          "forest":[{"date":"11日星期五","fengli":"<![CDATA[3级]]>","dengxiang":"南风","high":"高温 25℃","low":"低温 21℃","type":"小雨"},{"date":"12日星期六","fengli":"<![CDATA[3级]]>","dengxiang":"东风","high":"高温 25℃","low":"低温 20℃","type":"小雨"},{"date":"13日星期天","fengli":"<![CDATA[2级]]>","dengxiang":"东风","high":"高温 26℃","low":"低温 20℃","type":"阴"},{"date":"14日星期一","fengli":"<![CDATA[2级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 22℃","type":"晴"},{"date":"15日星期二","fengli":"<![CDATA[4级]]>","dengxiang":"南风","high":"高温 27℃","low":"低温 20℃","type":"小雨"}],
                          "ganmao":"感冒低发期,天气舒适,请注意多吃蔬菜水果,多喝水哦。","wendu":"22"},
    "status":1000,
    "desc":"OK"}
    
    c.功能防抖及联想搜索优化

          功能防抖主要是为了频繁点击只处理其中一次事件;联想搜索优化主要是为了在输入文字时不频繁去服务器进行请求。以上两种实现都是在observer中对onNext()事件进行拦截处理,看是否满足条件,满足就执行下一步observer.onNext(),不满足就不往下执行。
          /*对控件点击及文字输入进行监听,采用了RxBinding,需要加入依赖

    // Rxbinding
    implementation 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
    

          功能防抖实现:

    //使用throttleFirst(1, TimeUnit.SECONDS):参数1:指定的时间段内;参数2:指定时间的单位
    //在1s内多次点击,只响应第一次
    private void throttleOperation() {
        RxView.clicks(clickBtn).throttleFirst(1, TimeUnit.SECONDS)
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(
                    new Observer<Object>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Object value) {
                            Log.e(TAG, "执行点击事件");
                            getWeatherInfo();
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("Seven", "对throttle Complete事件作出响应");
                        }
                    });
        }
    //频繁点击,输出如下:
    09-11 10:19:15.138 E/TAG   (31259): 执行点击事件
    09-11 10:19:16.221 E/TAG   (31259): 执行点击事件
    09-11 10:19:17.306 E/TAG   (31259): 执行点击事件
    

          看一下关键代码实现是在ObservableThrottleFirstTimed.java里面的DebounceTimedObserver类:

    static final class DebounceTimedObserver<T>
        extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, Runnable {
            ......
            ......
            @Override
            public void onNext(T t) {
                if (!gate && !done) {
                    gate = true;
    
                    actual.onNext(t);
    
                    Disposable d = get();
                    if (d != null) {
                        d.dispose();
                    }
                    DisposableHelper.replace(this, worker.schedule(this, timeout, unit));
                }
            }
    
            @Override
            public void run() {
                gate = false;
            }
            ......
            ......
        }
    

          通过以上逻辑可以看到,在首次执行完onNext()后,会将gate设为true,然后执行worker.schedule(this,timeout, unit),延时timeout执行该runnable,在run()中把gate设为false,下次onNext()就可以执行了。
          联想搜索优化:

    //联想搜索优化,根据指定时间过滤事件的过滤操作符
    //比如在搜索框输入文字时,在指定时间内不再有文字输入时,才会发送请求,否则不发送
    //若在这段时间内,输入框有文字输入或变化,则继续等待该段时间,循环上述过程
    private void debounceOperation() {
        RxTextView.textChanges(editTxt)
                    .debounce(1, TimeUnit.SECONDS).skip(1)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<CharSequence>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
                        @Override
                        public void onNext(CharSequence charSequence) {
                            resultTxt.setText("服务器请求字符串 = " + charSequence.toString());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                          
                        }
    
                        @Override
                        public void onComplete() {
    
                       }
                    });
        }
    

          看一下关键代码实现是在ObservableDebounceTimed.java里面的DebounceTimedObserver类:

    static final class DebounceTimedObserver<T> implements Observer<T>, Disposable {
            ......
            ......
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
                long idx = index + 1;
                index = idx;
                Disposable d = timer.get();
                if (d != null) {
                    d.dispose();
                }
                DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
                if (timer.compareAndSet(d, de)) {
                    d = worker.schedule(de, timeout, unit);
                    de.setResource(d);
                }
    
            }
            ......
            ......
            void emit(long idx, T t, DebounceEmitter<T> emitter) {
                if (idx == index) {
                    actual.onNext(t);
                    emitter.dispose();
                }
            }
        }
    
        static final class DebounceEmitter<T> extends AtomicReference<Disposable> implements Runnable, Disposable {
            ......
            ......
            @Override
            public void run() {
                if (once.compareAndSet(false, true)) {
                    parent.emit(idx, value, this);
                }
            }
            .......
        }
    

          通过以上逻辑可以看到,在执行onNext()时并没有直接执行下一个observer.onNext(),而是执行了worker.schedule(de,timeout, unit),延时timeout执行该runnable,在run()中执行该observer的emit(),然后在执行下一个observer的onNext()。

    d.操作符使用案例**

          merge()

    /**
     * 使用Merge操作符,合并两个Observable
     * 如果两个Observable<T>,T是相同的类型,比如String, 那么在subscribe里面的new Observer<String>和 onNext(String)
     * 如果两个Observable<T>,T不是相同的类型,如下: 那么在subscribe里面的new Observer<Object>和 onNext(Object)即可
     */
    private void mergeOperation() {
        //设置第1个Observable:获取字符串
        Observable<String> strOb = Observable.just("字符串");
    
        //设置第2个Observable:请求获取整数
        Observable<Integer> intOb = Observable.just(1000);
    
        //通过merge()合并事件 & 同时发送事件
        Observable.merge(strOb, intOb)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Object>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Object value) {
                            if (value instanceof String) {
                                result += value;
                            } else {
                                Log.d(TAG, "数据为: " + (Integer)value);
                            }
                        }
    
                        @Override
                        public void onError(Throwable e) {
                        }
    
                        // 接收合并事件后,统一展示
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "获取数据完成");
                            setMergeTxt(result);
                        }
                    });
        }
    //打印如下:
    09-11 10:31:14.873 D/TAG   (31259): 数据为: 1000
    09-11 10:31:14.873 D/TAG   (31259): 获取数据完成
    09-11 10:31:14.875 D/TAG   (31259): result is: 字符串
    

          zip()

    @SuppressLint("CheckResult")
    private void zipOperation() {
        Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl("http://fy.iciba.com/")
                    .addConverterFactory(GsonConverterFactory.create()) 
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();
    
        RetrofitApi request = retrofit.create(RetrofitApi.class);
    
        //采用Observable<...>形式 对 2个网络请求 进行封装
        //即2个网络请求异步 & 同时发送
        Observable<Translation> observable = request.getTranslation().subscribeOn(
                    Schedulers.io()); // 新开线程进行网络请求1
        Observable<Translation1> observable1 = request.getTranslationTwo().subscribeOn(
                    Schedulers.io());// 新开线程进行网络请求2
    
        // 通过使用Zip()对两个网络请求进行合并再发送
        Observable.zip(observable, observable1,
                    new BiFunction<Translation, Translation1, String>() {
                        // 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
                        @Override
                        public String apply(Translation translation,
                                Translation1 translation1) throws Exception {
                            return translation.show() + " & " + translation1.show();
                        }
                    }).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        // 成功返回数据时调用, Consumer<T>, T对应apply的返回值,apply对应BiFunction最后的参数类型
                        @Override
                        public void accept(String combineInfo) throws Exception {
                            // 结合显示2个网络请求的数据结果
                            Log.d(TAG, "最终接收到的数据是:" + combineInfo);
                            setZipTxt(combineInfo);
                        }
                    }, new Consumer<Throwable>() {
                        // 网络请求错误时调用
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.d(TAG, "请求失败: " + throwable.toString() );
                        }
        });
    }
    
    e.其他

          RxJava对应很多操作符供开发使用,详情请参考大神整理的以下表格:


    RxJava操作符.png

    六.总结

          基于事件流的异步操作库,事件流是通过subscribe触发的(实际上是subscribeActual),每个Observable子类的subscribeActual实现逻辑不同。
          从下往上订阅,不断执行Observable的subscribeActual();
          事件从上往下发射,不断执行Observer的onNext()等方法。
    非常感谢网上大神整理的一些文章,学习之后记录下来备后续使用。

    相关文章

      网友评论

          本文标题:RxJava原理分析

          本文链接:https://www.haomeiwen.com/subject/nebpektx.html