RxAndroid入门

作者: Sean1225 | 来源:发表于2019-03-18 17:56 被阅读0次

    响应式编程

    RxJava提供了响应式编码规范,而RxAndroid是专供Android平台的RxJava(只是针对平台增加了少量类),一般Android开发者口中的RxJava指的便是RxAndroid。关于响应式编程的概念这里不多说,网上一大堆,我就简单说明下响应式编程与传统编程的区别。如果逻辑B依赖于逻辑A,逻辑C依赖于逻辑B,那么:

    1. 传统编程中的作法是在A执行完成后去执行B,B执行完毕后执行C,除了A、B、C本身的逻辑外,这部分由依赖产生的先后执行逻辑也是由开发者编写。
    2. 响应式编程中,会先将A、B、C做个依赖绑定,A->B->C,就像水流一样,从A流到B,再从B流到C,依赖产生的先后执行逻辑由语言或者库提供支持,开发者只需要编写A、B、C本身的逻辑以及告诉提供方三者间的关系即可。换句话说,就是B会自动响应A的执行结果,C会自动响应B的执行结果。

    响应式编程可以很优雅地处理任务间(或者业务间)的依赖关系,而这关系就像流一样,并且多用于异步场景,且由于大家熟知的响应式编程库RxJava的主要应用场景也是异步,主要应用手段是流,因此不少文章简单粗暴地将响应式编程等同是异步+流,这个是比较片面的。

    示例

    我们来看个具体的例子,加深下理解。本例中需要处理三个任务,分别为A、B、C,这三个任务都需要在工作线程中执行,A执行完以后延迟1S执行B,B执行完以后延迟1S执行C。

    以下为公共代码,定义了A、B、C三个任务以及定时器:

    public class TaskA implements Runnable {
        @Override
        public void run() {
            // do something
        }
    }
    
    public class TaskB implements Runnable {
        @Override
        public void run() {
            // do something
        }
    }
    
    public class TaskC implements Runnable {
        @Override
        public void run() {
            // do something
        }
    }
    
    public class Timer {
        private static final Timer INSTANCE = new Timer();
        private Handler mHandler;
    
        private Timer() {
            HandlerThread thread = new HandlerThread("timer");
            thread.start();
            mHandler = new Handler(thread.getLooper());
        }
    
        public static Timer get() {
            return INSTANCE;
        }
    
        public void post(Runnable runnable) {
            mHandler.post(runnable);
        }
    
        public void postDelayed(Runnable runnable, long delay) {
            mHandler.postDelayed(runnable, delay);
        }
    }
    

    传统编程

    Timer.get().post(new Runnable() {                          
           @Override                                            
           public void run() {                                  
               new TaskA().run();                               
               Timer.get().postDelayed(new Runnable() {         
                   @Override                                    
                   public void run() {                          
                       new TaskB().run();                       
                       Timer.get().postDelayed(new Runnable() { 
                           @Override                            
                           public void run() {                  
                               new TaskC().run();               
                           }                                    
                       }, 1000);                                
                   }                                            
               }, 1000);                                        
           }                                                    
       });                                  
    

    传统编程方式的问题非常突出,嵌入层次太多,随着依赖关系的增多以及复杂化,这样的代码会变得极其臃肿且不易阅读和维护。

    响应式编程

    首先我们需要编写一个支持响应式编程规范的库,代码如下(这个类很多情况没有考虑到,仅仅是为了演示用):

    public class StreamTimer implements Runnable {
        private List<Task> mTasks = new LinkedList<>();
    
        public StreamTimer() {
    
        }
    
        public StreamTimer next(Runnable runnable) {
            return next(runnable, 0);
        }
    
        public StreamTimer next(Runnable runnable, long delay) {
            Task task = new Task(runnable, delay);
            mTasks.add(task);
            return this;
        }
    
        public void startup() {
            startNextTimer();
        }
    
        private void startNextTimer() {
            if(mTasks.isEmpty()) {
                return;
            }
            Task task = mTasks.get(0);
            Timer.get().postDelayed(this, task.delay);
        }
    
        @Override
        public void run() {
            Task task = mTasks.remove(0);
            task.runnable.run();
            startNextTimer();
        }
    
        private class Task {
            Runnable runnable;
            long delay;
    
            Task(Runnable runnable, long delay) {
                this.runnable = runnable;
                this.delay = delay;
            }
        }
    }
    

    执行A、B、C任务的代码如下:

    new StreamTimer()           
       .next(new TaskA())       
       .next(new TaskB(), 1000) 
       .next(new TaskC(), 1000) 
       .startup();              
    

    next表示新增一个任务且需要在上一次任务执行完毕后才执行。以上代码非常简洁且可读性很强,任务间的依赖关系非常清晰,拓展也非常简单,新增任务时只需要在合适的地方插入next方法即可。

    比起上述例子,RxJava的功能要强大得多,也复杂得多,上述例子只是为了让新手能快速掌握响应式编程及流式调用(也称为链式调用),接下去我们开始讲解RxJava。

    RxAndroid基本使用

    使用RxAndroid需要在build.gradle中加入如下依赖:

    compile 'io.reactivex.rxjava2:rxandroid:2.1.1'
    compile 'io.reactivex.rxjava2:rxjava:2.2.7'
    

    RxAndroid也直接去https://github.com/ReactiveX/RxAndroid下载源码,里面就不到10个类。

    Observable和Observer

    1. Observable:被观察者,用来处理事件的派发。
    2. Observer:观察者,观察目标为Observable,Observable派发出来的事件将被它处理,一个Observable可以有多个Observer。当Observable有变化时,Observer能够立即响应这些变化。

    熟悉观察者模式的同学应该对这两者有非常深刻的认识了,它们是RxJava中最基础的东西,RxJava中其他的对象、方法、操作符都是围绕这二者进行或拓展的。来看下最简单的例子:

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
            emitter.onComplete();
        }
    });
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            log("onSubscribe");
        }
        @Override
        public void onNext(String s) {
            log("onNext:" + s);
        }
        @Override
        public void onError(Throwable e) {
            error("onError", e);
        }
        @Override
        public void onComplete() {
            log("onComplete");
        }
    };
    observable.subscribe(observer);
    

    日志输出如下:

    onSubscribe
    onNext:A
    onNext:B
    onNext:C
    onComplete
    


    来看下上述例子中Observable相关的对象和方法:

    1. 使用Observable.create创建Observable对象。
    2. ObservableEmitter为发射器,Observable使用它发射事件给所有Observer
    3. 使用Observable.subscribe添加一个Observer


    接着来看下Observer的几个方法:

    1. onSubscribe:在订阅observable时回调,可以在这里调用Disposable.dispose取消订阅或者将Disposable对象保存起来以便在后续某个时刻取消订阅。
    2. onNext:在ObservableEmitter.onNext执行后回调,onNext表示的是整个响应链中的一环,在这里处理响应链中的其中一个任务,可以多次调用。
    3. onComplete:在ObservableEmitter.onComplete执行后回调,表示任务已全部完成,可以在这里做收尾工作。
    4. onError:在ObservableEmitter.onError执行后或者链中任一环节出现异常时回调,表示任务执行失败。

    除了onSubscribe,其它几个方法有如下特点:

    1. ObservableEmitter中的同名方法一一对应,在ObservableEmitter的同名方法执行后回调。
    2. onCompleteonError互斥,两者只能触发其中之一,且触发后onNext便不会再触发。
    3. onComplete触发后,后续ObservableEmitter调用任何方法都不会再生效。
    4. onError触发后,如果ObservableEmitter再调用onError或者onComplete,RxJava会抛出异常,开发者需要自行保证唯一性。(处理方式为什么不同onComplete?)

    需要特别注意的,ObservableOnSubscribe.subscribe方法在每次有新的Observer加入时,都会在Observer.onSubscribe回调之后触发,这就保证了所有的Observer都能接收到事件。

    subscribe

    在上面提到了通过subscribeObservableObserver建立了绑定关系,我们来看下方法原型:

    void subscribe(Observer<? super T> observer)
    

    除此之外,subscribe还有很多重载方法,我们来看下所有的方法原型:

    Disposable subscribe();
    Disposable subscribe(Consumer<? super T> onNext);
    Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
    Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
    Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        subscribe(ls);
        return ls;
    }
    

    以上方法最终都会调用到参数最多的那个方法,而该方法内部又调用了subscribe(Observer<? super T> observer),这些方法相对来说更为简洁易用。可以发现新增了两个类,分别是ConsumerAction,其中ObserveronNextonErroronSubscribe方法被Consumer.accept方法取代,onComplete方法被Action.run方法取代。因此,如果我们仅仅只关心Observer的其中一个或多个回调,那么便可以通过ConsumerAction来代替Observer注册到Observable中。如上述的例子中,如果我们只关心onNext,那么可以这么使用:

    Observable<String> observable = Observable.create(new ObservableOnSubscribe
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
            emitter.onComplete();
        }
    });
    observable.subscribe(
            new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    log("onNext:" + s);
                }
                }, Functions.<Throwable>emptyConsumer(),
            new Action() {
                @Override
                public void run() throws Exception {
                    log("onComplete");
                }
            }
    );
    

    使用lambda表达式之后的代码如下:

    Observable<String> observable = Observable.create(
            (ObservableEmitter<String> emitter) -> {
                emitter.onNext("A");
                emitter.onNext("B");
                emitter.onNext("C");
                emitter.onComplete();
            }
        );
    observable.subscribe(
            (String s) -> log("onNext:" + s),
            Functions.<Throwable>emptyConsumer(),
            () -> log("onComplete")
    );
    

    代码显得简洁优雅易读得多,实际上RxJava是为响应式编程和函数式编程而生,因此使用lambda表达式才能完美的使用RxJava。

    Android使用lambda表达式需要使用jdk1.8,且gradle版本需要4.0以上(也可能是3.x某个版本,记不住了,4.0以上准没错),在build.gradle中加入如下代码:

    android {
        compileOptions {
            sourceCompatibility > JavaVersion.VERSION_1_8
            targetCompatibility > JavaVersion.VERSION_1_8
        }
    }
    

    线程调度

    以下称Observer.onXXX回调时所在线程为Observer工作线程,Observable发射事件时所在线程为Observable工作线程。默认情况下,ObserverObservable工作线程是同一个,该线程即调用Observable.subscribe时所在的线程。然而,异步调用才是RxJava的核心应用场景,下面我们来看下如何改变ObserverObservable的工作线程。使用Observable.subscribeOn配置Observable工作线程,使用Observable.observeOn配置Observer工作线程。很多情况下,置Observable工作线程位于子线程中,因为可能存在网络请求、数据存取等耗时操作;而Observer工作线程位于主线程,因为接收到事件后需要刷新UI。下面来看下这种场景下的应用示例:

    Observable<String> observable = Observable.create(
            (ObservableEmitter<String> emitter) -> {
                log("emit thread=" + Thread.currentThread().getName());
                emitter.onNext("A");
                emitter.onNext("B");
                emitter.onNext("C");
                emitter.onComplete();
            }
    );
    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    (String s) -> log("onNext:" + s + " thread=" + Thread.currentThread().getName()),
                    Functions.<Throwable>emptyConsumer(),
                    () -> log("onComplete thread=" + Thread.currentThread().getName())
            );
    

    日志输出如下:

    emit thread=RxNewThreadScheduler-2
    onNext:A thread=main
    onNext:B thread=main
    onNext:C thread=main
    onComplete thread=main
    

    可以看到线程调度确实如我们所期望的了。需要特别强调的subscribeOnobserveOn返回的不是原本的Observable对象,因此如果没有采用链式调用,在调用这两个方法之后必须重新赋值给Observable对象,如:

    // 错误调用
    observable.subscribeOn(xxx)
        .observeOn(xxx);
    observable.subscribe(xxx);
    
    // 正确调用
    observable = observable.subscribeOn(xxx)
        .observeOn(xxx);
    observable.subscribe(xxx);
    

    如果重复调用subscribeOnobserveOn会怎么样呢,我们来看段代码:

    Observable<String> observable = Observable.create(
            (ObservableEmitter<String> emitter) -> {
                log("emit thread=" + Thread.currentThread().getName());
                emitter.onNext("A");
            }
    );
    observable = observable.subscribeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.single())
            .observeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.single());
    observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
    

    日志输出如下:

    emit thread=main
    onNext1:A thread=RxSingleScheduler-1
    

    可以得出如下临时结论:

    1. subscribeOn以第一次调用为准。
    2. observeOn以最后一次调用为准。

    我们再来看下更复杂的例子:

    Observable<String> observable = Observable.create(
            (ObservableEmitter<String> emitter) -> {
                log("emit thread=" + Thread.currentThread().getName());
                emitter.onNext("A");
            }
    );
    observable = observable.subscribeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.single())
            .observeOn(AndroidSchedulers.mainThread())
            .observeOn(Schedulers.single());
    observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
    observable = observable.subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io()) ;
    observable.subscribe((String s) -> log("onNext2:" + s + " thread=" + Thread.currentThread().getName()));
    

    日志输出如下:

    emit thread=main
    emit thread=main
    onNext1:A thread=RxSingleScheduler-1
    onNext2:A thread=RxCachedThreadScheduler-2
    

    现在,我们可以得出最终结论了:

    1. subscribeOn以第一次调用为准。
    2. observeOn以调用subscribe前的最后一次调用为准,每个subscribe单独计算。

    由此可知,我们可以让不同的Observer在不同线程中调度。

    RxJava使用Scheduler来表示线程调度,上面提到的Schedulers.newThread()AndroidSchedulers.mainThread()都是由RxJava提供的Scheduler实现类。一般我们不需要手动去实现Scheduler,而是通过Schedulers或者AndroidSchedulers(Android专用)获取。下面分别来看下二者所提供的创建能力。

    Schedulers

    newThread

    大致等同于new Thread(runnable).start();,线程数没有上限,除了测试场景一般不会用到它。

    io

    用于I/O操作场景,线程数没有上限。与newThread比较相似,区别在于该调度器的内部使用了一个无数量上限的线程池,可以复用空闲的线程,因此效率更高。

    computation

    用于计算场景,计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,因此不要把I/O操作放在这里。该类型的Scheduler使用固定数量的线程池,数量为处理器核数。除了阻塞(包括I/O操作、wait等)外,其他操作都可以使用该调度器,不过通常用处理事件循环,大数据运算等。

    single

    单线程调度,所有任务都需要排队依次运行。

    trampoline

    任务在当前线程运行。

    from(Executor executor)

    使用指定的线程池调度。

    AndroidSchedulers

    mainThread

    任务在主线程上运行。

    from(Looper looper)

    任务在指定Looper上调度。

    相关文章

      网友评论

        本文标题:RxAndroid入门

        本文链接:https://www.haomeiwen.com/subject/uhmbmqtx.html