美文网首页
RxJava 初探

RxJava 初探

作者: Sunny_An | 来源:发表于2016-04-12 01:30 被阅读336次

    前言

    去年无意间知道了RxJava这个东东,但一直没时间去看看。最近,终于有了不少时间,经过两周的学习,对RxJava有了初步的认识。下面我会记录一些个人认为比较重要的知识,以备以后查看(并没有深层的原理解析,仅仅在应用层面上)。

    个人对RxJava的理解

    RxJava是一个对数据流和事件流操作的库,它是对数据的一种链式操作,在操作过程中,可以方便的进行数据加工,线程切换,从而避免了复杂的嵌套,缩进,提高代码的可读性,可维护性。

    RxJava的使用

    1. 简单的例子
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(20);
                }
            }).map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return integer.toString();
                }
            }).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }
            });
    

    Observable.create()可以用来创建一个被观察者,Observable.OnSubscribe中有被订阅时执行的操作。
    这时可以使用操作符对得到的Observable进行一些变换。比如map(),对单一的数据流进行变换操作,如将Integer转为String类型。每进行一次变换,就得到了一个新的Observable
    最后,使用subscribe()方法,对其进行订阅操作。其中Subscriber为订阅者。

    1. 常用的创建操作符
    • create() 从头创建一个Observable
    • just() 将对象或者对象集合转换为一个会发射这些对象的Observable
            Observable.just(1, 2, 3)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println(integer);
                        }
                    });
    
    • from() 将其它的对象或数据结构转换为Observable,比如数组
            Integer[] numbers = new Integer[]{1, 2, 3};
            Observable.from(numbers)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println(integer);
                        }
                    });
    
    1. 线程控制
      在不指定线程时,RxJava遵循线程不变原则,在哪个线程中执行subscribe(),就在哪个线程产生事件,消费事件。如需切换线程,需要用到Schedules(调度器)。
    • 常用的调度器
      • Schedulers.immediate() 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
      • Schedulers.newThread() 总是启用新线程,并在新线程执行操作。
      • Schedulers.io() I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
      • Schedulers.computation() 计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则 I/O操作的等待时间会浪费CPU。
      • AndroidSchedulers.mainThread() Android专用的Schedule,指定操作在Android主线程(UI线程)中运行。
    • 线程控制操作符
      • subscribeOn() 指定subscribe()所发生的线程,即Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。subscribeOn()的位置放在哪里都可以,但它是只能调用一次。不过,可以在Observable.doOnSubscribe()后使用subscribeOn()来指定subscribe()时,执行doOnSubscribe()所在线程
            Integer[] numbers = new Integer[]{1, 2, 3};
            Observable.from(numbers)
                    .subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Action0() {
                        @Override
                        public void call() {
                            System.out.println("toast  " + Thread.currentThread().getName());
                            Toast.makeText(MainActivity.this, "hello", Toast.LENGTH_SHORT).show();
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())  //①
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println(integer + "  " + Thread.currentThread().getName());
                        }
                    });
    

    如没有①处的subscribeOn(AndroidSchedulers.mainThread())doOnSubscribe将在当前执行subscribe()的线程中运行。
    - observeOn() 指定Subscriber所运行在的线程,或者叫做事件消费的线程。这里的Subscriber可以是变换以后的,换句话说,observeOn()指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()即可。

            Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.newThread())
                    .map(mapOperator) // 新线程,由 observeOn() 指定
                    .observeOn(Schedulers.io())
                    .map(mapOperator2) // IO 线程,由 observeOn() 指定
                    .observeOn(AndroidSchedulers.mainThread)
                    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定
    
    圈起来的的方法所在的工作线程,受相应颜色的线程控制操作符影响

    说明

    这篇笔记是在看完 扔物线 大大的 给 Android 开发者的 RxJava 详解 一文后的笔记,并引用了文章里的部分代码和图片,仅供个人学习,还请大大不要责怪(●'◡'●)。这也是我写的第一篇文章,好开心,以后会坚持下去的。在水平提高后,也会努力写一些原创文章,与大家共同进步~

    相关文章

      网友评论

          本文标题:RxJava 初探

          本文链接:https://www.haomeiwen.com/subject/hrgilttx.html