美文网首页Android开发
RxJava技术探索及其Android中的应用

RxJava技术探索及其Android中的应用

作者: 水大云霄 | 来源:发表于2016-07-21 13:11 被阅读259次

    1、RxJava简介

    RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,Rx的全称是Reactive Extensions,直译过来就是响应式扩展。Rx基于观察者模式,他是一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。RxJava说白了就是用Java语言边写的使用各种操作符形成链式结构来处理异步数据流的工具。

    2、RxJava基础及常用操作符

    RxJava 有四个基本概念:
    Observable(可观察者,即被观察者);
    Observer(观察者);
    subscribe(订阅)事件;
    Observable和Observer通过 subscribe() 方法实现订阅关系,从而 Observable可以在需要的时候发出事件来通知 Observer。

    (1) 创建 Observable被观察者

    Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {    
    @Override    
    public void call(Subscriber<? super String> subscriber) {  
        subscriber.onNext("Hello, RxJava!");        
        subscriber.onCompleted();    
    }});
    

    (2) 创建 Observer观察者

    Subscriber<String> mySubscriber = new Subscriber<String>() {   
        @Override    
        public void onCompleted() { }   
        @Override    
        public void onError(Throwable e) { }    
        @Override    
        public void onNext(String s) {       
        Log.d("haijiang", "--->" + s); }
    };
    

    Subscriber是Observer的扩展,在使用过程中,我们直接用Subscriber作为观察者对象就OK了!

    (3) 建立订阅关系

    myObservable.subscribe(mySubscriber);
    

    一旦建立订阅关系,OnSubscribe中的call方法就会被调用,在call方法中主动触发了观察者的onNext,onCompleted方法,可看到输出“Hello, RxJava!” 。
    以上是一个最基本的流程,我们可以写成链式调用:

    Observable.create(new Observable.OnSubscribe<String>() {    
    @Override    
    public void call(Subscriber<? super String> subscriber) {  
        subscriber.onNext("Hello, RxJava!");        
        subscriber.onCompleted();    
    }}).subscribe(new Subscriber<String>() {   
        @Override    
        public void onCompleted() { }   
        @Override    
        public void onError(Throwable e) { }    
        @Override    
        public void onNext(String s) {       
        Log.d("haijiang", "--->" + s); }
    });
    
    

    更多操作符欣赏:

    just

    subscribe方法有一个重载版本,接受三个Action1类型的参数,分别对应OnNext,OnComplete, OnError函数 
    Observable.just("hello","rxjava","rxandroid").subscribe(new Action1<String>() {    
       @Override    
       public void call(String s) {       
            Log.d("haijiang", "--->" + s);   
     }});
    

    ActionX没有返回,还有一种FuncX有返回的操作,后面会说。
    just操作符是创建一个Observable,一次发送传入的参数。

    from

    /** * Observable.from()方法,它接收一个集合作为输入 */
    String[] strArrsy = {"hello","rxjava","rxandroid"};
    Observable.from(strArrsy).subscribe(new Action1<String>() {    
       @Override    
       public void call(String s) {        
            Log.d("haijiang", "--->" + s);    
    }});
    

    下面介绍两个重量级操作符map和flatMap,核心变换,灵活操作。

    map

    map是一对一的变化,将一个Observable<T>变换成Observable<R>

    Observable.just("I LOVE YOU!").map(new Func1<String, Integer>() {   
        @Override    
        public Integer call(String s) {        
             return 520;    
       }}).subscribe(new Action1<Integer>() {    
        @Override    
        public void call(Integer s) {       
             Log.d("haijiang", "--->" + s);   
     }});
    

    通过map,Func1将String转换成了Int;其中FuncX是RxJava的一个包装接口,跟ActionX类似,只不过FuncX是有返回对象的。

    flatMap

    flatMap()中返回的是个 Observable对象,并且这个 Observable对象并不是被直接发送到了 Subscriber
    的回调方法中。flatMap()的原理是这样的:

    1. 使用传入的事件对象创建一个 Observable对象;
    2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
    3. 每一个创建出来的 Observable发送的事件,都被汇入同一个 Observable,而这个 Observable负责将这些事件统一交给Subscriber 的回调方法。
      这三个步骤,把事件拆成了两级,通过一组新创建的 Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
    private ArrayList<Data> mData;
    Observable.from(mData).flatMap(new Func1<Data, Observable<ChildData>>() {
          @Override 
          public Observable<ChildData> call(Data data) { 
                 return Observable.from(data.getChildData); 
            } 
    }).subscribe(new Action1<ChildData>() { 
          @Override 
          public void call(ChildData cd) {
                 Log.d("haijiang", "--->" + cd.getName); 
      }});
    

    concatMap

    flatMap()操作符使用你提供的原本会被原始Observable发送的事件,来创建一个新的Observable。而且这个操作符,返回的是一个自身发送事件并合并结果的Observable。可以用于任何由原始Observable发送出的事件,发送合并后的结果。记住,flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序。为了防止交错的发生,可以使用与之类似的concatMap()操作符。综上所述,就是利用concatMap替换flatMap操作符,输入顺序就防止了交错,跟原始Obervable顺序一致。

    timer()

    timer操作符:用于创建Observabl,延迟发送一次。
    下面延时两秒,输出log

    Observable.timer(2, TimeUnit.SECONDS)
                  .subscribe(new Observer<Long>() {
                      @Override
                      public void onCompleted() {
                          log.d ("completed");
                      }
    
                      @Override
                      public void onError(Throwable e) {
                          log.e("error");
                      }
    
                      @Override
                      public void onNext(Long number) {
                          log.d ("hello world");
                      }
                  });
    
    

    interval()

    interval:用于创建Observable,用于每个XX秒循环进行某个操作

    Observable.timer(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() { 
         @Override 
         public void call(Long aLong) { /
        /TODO WHAT YOU WANT 
       } 
    });
    

    delay()

    delay:用于事件流中,可以延迟发送事件流中的某一次发送。

    retryWhen

    retryWhen()是RxJava的一种错误处理机制,当遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable。下面封装一个处理网络错误的类

    public class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {
    
    private long mInterval;
    
        public RetryWhenProcess(long interval) {
    
            mInterval = interval;
        }
    
        @Override
        public Observable<?> call(Observable<? extends Throwable> observable) {
            return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                @Override
                public Observable<?> call(Throwable throwable) {
                    return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                if (throwable instanceof UnknownHostException) {
                                    return Observable.error(throwable);
                                }
                                return Observable.just(throwable).zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Integer>() {
                                    @Override
                                    public Integer call(Throwable throwable, Integer i) {
    
                                        return i;
                                    }
                                }).flatMap(new Func1<Integer, Observable<? extends Long>>() {
                                    @Override
                                    public Observable<? extends Long> call(Integer retryCount) {
    
                                        return Observable.timer((long) Math.pow(mInterval, retryCount), TimeUnit.SECONDS);
                                    }
                                });
                            }
                        });
                }
            });
        }
    }
    

    使用方法:

    .retryWhen(new RetryWhenProcess(5))
    

    compose

    compose()是针对 Observable自身进行变换。假设在程序中有多个 Observable,并且他们都需要应用一组相同的变换可以使用。
    场景是这样的:work thread 中处理数据,然后 UI thread 中处理结果。当然,我们知道是要使用 subscribeOn() 和 observeOn() 进行处理。最常见的场景是,调server 的 API 接口取数据的时候,那么,那么多接口,反复写这两个操作符是蛋疼的,为了避免这种情况,我们可以通过 compse() 操作符来实现复用,下面面这段代码就实现了这样的功能。

    /**
     * 这个类是 小鄧子 提供的!
     */
    public class SchedulersCompat {
        private static final Observable.Transformer computationTransformer =
                new Observable.Transformer() {
                    @Override public Object call(Object observable) {
                        return ((Observable) observable).subscribeOn(Schedulers.computation())
                                .observeOn(AndroidSchedulers.mainThread());
                    }
                };
        private static final Observable.Transformer ioTransformer = new Observable.Transformer() {
            @Override public Object call(Object observable) {
                return ((Observable) observable).subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
        private static final Observable.Transformer newTransformer = new Observable.Transformer() {
            @Override public Object call(Object observable) {
                return ((Observable) observable).subscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
        private static final Observable.Transformer trampolineTransformer = new Observable.Transformer() {
            @Override public Object call(Object observable) {
                return ((Observable) observable).subscribeOn(Schedulers.trampoline())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
        private static final Observable.Transformer executorTransformer = new Observable.Transformer() {
            @Override public Object call(Object observable) {
                return ((Observable) observable).subscribeOn(Schedulers.from(ExecutorManager.eventExecutor))
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
        /**
         * Don't break the chain: use RxJava's compose() operator
         */
        public static <T> Observable.Transformer<T, T> applyComputationSchedulers() {
            return (Observable.Transformer<T, T>) computationTransformer;
        }
        public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
            return (Observable.Transformer<T, T>) ioTransformer;
        }
        public static <T> Observable.Transformer<T, T> applyNewSchedulers() {
            return (Observable.Transformer<T, T>) newTransformer;
        }
        public static <T> Observable.Transformer<T, T> applyTrampolineSchedulers() {
            return (Observable.Transformer<T, T>) trampolineTransformer;
        }
        public static <T> Observable.Transformer<T, T> applyExecutorSchedulers() {
            return (Observable.Transformer<T, T>) executorTransformer;
        }
    }
    

    使用方式:

    .compose(SchedulersCompat.ioTransformer );
    

    3、线程控制 — Scheduler

    在RxJava 中,Scheduler—调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Schedule,它们已经适合大多数的使用场景:
    Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()比 newThread()更有效率。不要把计算工作放在 io()中,可以避免创建不必要的线程。
    Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

    有了这几个 Scheduler,就可以使用 subscribeOn() 和 observeOn()两个方法来对线程进行控制。subscribeOn() 指定subscribe()所发生的线程,即 Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。 observeOn()指定Subscriber所运行在的线程,或者叫做事件消费的线程。

    4RxJava在android开发中的一些应用

    参考:可能是东半球最全的RxJava使用场景小结 (http://blog.csdn.net/theone10211024/article/details/50435325)

    RxBinding
    节流(防止按钮的重复点击)
    轮询,
    定时操作
    RxPermissions
    RxBus
    RxJava与Retrofit

    (1)RxBinding

    RxBindingJakeWharton大牛用RxJava为Android控件编写的一个控件绑定库。
    例子:

    Button button = (Button) findViewById(R.id.button); 
    RxView.clicks(button).subscribe(new Action1<Void>() { 
    @Override 
    public void call(Void aVoid) { 
    Log.i("test", "clicked"); 
    }
     });
    

    (2)防止重复点击

    RxView.clicks(button).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Action1<Void>() {
                @Override
                public void call(Void aVoid) {
                    Log.i("test", "clicked");
                }
            });
    

    (3)EditText输入请求。避免每次输入产生频繁的请求

    RxTextView.textChangeEvents(inputEditText)
          .debounce(400, TimeUnit.MILLISECONDS) 
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Observer<TextViewTextChangeEvent>() {
        @Override
        public void onCompleted() {
            log.d("onComplete");
        }
    
        @Override
        public void onError(Throwable e) {
            log.d("Error");
        }
    
        @Override
        public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
            log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
        }
    });
    
    

    (4)RxPermissions

    RxPermissions也是国外的大牛开发的基于RxJava的Android权限管理库,他让6.0以上的权限管理更加的简单,如果有适配6.0以上的手机的需求,这个库是个不错的选择。下面我们来看看基本的用法。

     // 请求相机权限
        RxPermissions.getInstance(this)
        .request(Manifest.permission.CAMERA)
        .subscribe(granted -> {
            if (granted) { // 用户同意了(在6.0之前的手机始终都为true)
              //可以拍照了
            } else {
               //可以在这里提示用户,或者再次请求
            }
        });
    

    更多功能研究github吧

    (5)RxBus

    参考:http://www.jianshu.com/p/ca090f6e2fe2
    不多说,上代码

    /**
    * RxBus
    * Created by YoKeyword on 2015/6/17.
    */
    public class RxBus {
        private static volatile RxBus defaultInstance;
    
        private final Subject<Object, Object> bus;
        // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
        public RxBus() {
          bus = new SerializedSubject<>(PublishSubject.create());
        }
        // 单例RxBus
        public static RxBus getDefault() {
            if (defaultInstance == null) {
                synchronized (RxBus.class) {
                    if (defaultInstance == null) {
                        defaultInstance = new RxBus();
                    }
                }
            }
            return rxBus;
        }
        // 发送一个新的事件
        public void post (Object o) {
            bus.onNext(o);
        }
        // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
        public <T> Observable<T> toObservable (Class<T> eventType) {
            return bus.ofType(eventType);
    //        这里感谢小鄧子的提醒: ofType = filter + cast
    //        return bus.filter(new Func1<Object, Boolean>() {
    //            @Override
    //            public Boolean call(Object o) {
    //                return eventType.isInstance(o);
    //            }
    //        }) .cast(eventType);
        }
    }
    

    (6)RxJava 与 Retrofit 结合的最佳实践

    参考扔物线文章:http://gank.io/post/56e80c2c677659311bed9841

    参考:
    1、http://www.jianshu.com/users/df40282480b4/latest_articles
    2、http://gank.io/post/560e15be2dca930e00da1083#toc
    3、http://www.jianshu.com/p/8cf84f719188

    相关文章

      网友评论

        本文标题:RxJava技术探索及其Android中的应用

        本文链接:https://www.haomeiwen.com/subject/rxvejttx.html