RxJava语法练习

作者: 無名小子的杂货铺 | 来源:发表于2016-11-03 14:53 被阅读997次

很早之前就听Rxjava 多么神奇,各种评论都有,说学习坡度高,难等等,于是亲自上手试试,说实话刚开始看到这些用法立马就蒙比了,和我们之前的方式不太一样,理解起来还是挺别扭的,之后找了一篇给 Android 开发者的 RxJava 详解 扔物线的,好好研究上几天,认真理解每个例子并自己动手敲出来,也就慢慢能体会到 rxjava 的好用之处了。

刚开始学习的时候,为了加深理解,按照这篇 NotRxJava懒人专用指南,认认真真敲了一遍很有用,推荐给大家。

内容包含如下:
如何获取,观察者模式,基本语法,操作符使用,线程控制使用

注意:以下部分内容部分段落部分引用自给 Android 开发者的 RxJava 详解,如有冒犯之处,我会及时删除。

一、如何获取

RxJava
RxAndroid
在项目中引入依赖就好了,我这里用的不是最新版本

compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'

二、观察者模式

这个模式基本上都听过,使用过的就可以直接略过了。

先说下设计模式中的观察者模式:

当对象间存在一对多关系时,则使用观察者模式(Observer Pattern) 比如,当一个对象被修改时,则会自动通知它的依赖对象,观察者模式属于行为型模式。

使用在java中实现观察者模式需要用到java.util包中提供的Observable类和Observer接口,java已经给我们提供好类使用。

Observable可以查看java源码,下面是Observer接口:

 public interface Observer {
     void update(Observable observable, Object data);
 }

举个简单例子:

古董被观察,观察者为people用来观察古董价钱波动
被观察者:

 public class Antique extends Observable {
     private float mPrice;// 价钱

     public Antique(float price) {
         this.mPrice = price;
     }

     public float getPrice() {
         return this.mPrice;
     }

     public void setPrice(float price) {
         super.setChanged();
         super.notifyObservers(price);// 价格被改变
         this.mPrice = price;
     }

     public String toString() {
         return "古董价格为:" + this.mPrice;
     }
 }

观察者实现Observer接口,重写update方法即可

 public class People implements Observer{

     private String name;

     public People(String name) {
         this.name = name;
     }

     @Override
     public void update(Observable observable, Object data) {
         Log.e("","People update() -> update name:"+ this.name + ",price:"+ ((Float)data).floatValue());
     }

 }

主函数调用

  Antique house = new Antique(1222f);
  People p1 = new People("p1");
  People p2 = new People("p2");
  People p3 = new People("p3");
  house.addObserver(p1);
  house.addObserver(p2);
  house.addObserver(p3);
  Log.e("", house+""); // 输出价格
  house.setPrice(111f);
  Log.e("", house+""); // 输出价格

这样只要价格发生变化,就会通知所有订阅的人,也就实现了简单的观察者模式。

RxJava 中的观察者模式

观察者:Observer
被观察者:Observable
订阅:subscribe()

观察者

Observer即为观察者,处理事件发生后逻辑,如古董例子,价格改变后,需要updae信息,不过 Rxjava 这里多了几种处理情况,onCompleted(),onError(),onNext(),具体用法看 dmeo 就可以了。

被观察者

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。可以使用create()、just(T...)、from(T[])或from(Iterable<? extends T>)来创建一个 Observable ,并为它定义事件触发规则。

订阅

创建了Observable和Observer之后,再用subscribe()方法将它们联结起来。

三、基本语法

1、打印 Hello World

Observable.create(new Observable.OnSubscribe<String>() {

     @Override
     public void call(Subscriber<? super String> subscriber) {
         subscriber.onNext("Hello World");
         subscriber.onCompleted();
     }
 }).subscribe(new Observer<String>() {
     @Override
     public void onCompleted() {
         System.out.println("onCompleted:");
     }

     @Override
     public void onError(Throwable e) {
         System.out.println("onError e:"+e);
     }

     @Override
     public void onNext(String s) {
         System.out.println("onNext s:"+s);
     }
 });

//还可以这么写

 Observable.just("hello World").subscribe(new Action1<String>() {
     @Override
     public void call(String s) {
         System.out.println("call s:"+s);
     }
 });

2、将字符串数组 names 中的所有字符串依次打印出来

 String[] names = {"111", "222", "333"};
 Observable.from(names).subscribe(new Action1<String>() {
     @Override
     public void call(String name) {
         System.out.println("test1 name:" + name);
     }
 });
// 输出
test1 name:111
test1 name:222
test1 name:333

另一种写法

 @Test
 public void test4(){
     //1:被观察者
     String [] names ={"111","222","333"};
     Observable observable = Observable.from(names);

     //2:观察者
     Action1 onNextAction = new Action1<String>() {
         @Override
         public void call(String s) {
             System.out.println("test4 call"+s);
         }
     };

     Action1<Throwable> onErrorAction = new Action1<Throwable>() {
         @Override
         public void call(Throwable e) {
             System.out.println("test4 call e:"+e);
       }
     };

     Action0 onCompletedAction = new Action0() {
         @Override
         public void call() {
             System.out.println("test4 call onCompletedAction");
         }
     };
     //3:订阅:被观察者被观察者订阅
     observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
 }

3、just 将传入的参数依次打印

 @Test
 public void test2(){
     Observable.just("1","2","3").subscribe(new Observer<String>() {
         @Override
         public void onCompleted() {
             System.out.println("test2 onCompleted");
         }

         @Override
         public void onError(Throwable e) {
             System.out.println("test2 e:"+e);
         }

         @Override
         public void onNext(String s) {
             System.out.println("onNext:"+s);
         }
     });
 }
   //    onNext:1
  //    onNext:2
 //    onNext:3
 //    test2 onCompleted

4、 循环输出list

 @Test
 public void test5(){
     Observable.from(Data.getCats().get(0).getlist()).subscribe(new Action1<String>() {
         @Override
         public void call(String s) {
             System.out.println("test5 call :"+s);
         }
     });
 }

 //    test5 call :list:0
 //    test5 call :list:1
 //    test5 call :list:2
 //    test5 call :list:3
 //    test5 call :list:4
 //    test5 call :list:5
 //    test5 call :list:6
 //    test5 call :list:7
 //    test5 call :list:8
 //    test5 call :list:9

四、操作符

1、Filter:过滤操作,满足条件才可以通过

@Test
public void FilterTest(){
    Observable.just(1,2,3,4,5,6).filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer > 3;//只有>3的情况才会通过
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext integer:"+integer);
        }
    });
}
//  onNext integer:4
//  onNext integer:5
//  onNext integer:6
//  onCompleted

2、first,last 只处理第一个或最后一个

@Test
public void FirstLastTest(){
    Observable.just(1,2,3,4,5,6).first()/*.last()*/.subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext integer:"+integer);
        }
    });
}

 //  onNext integer:1
 //  onCompleted

3、 take 函数限制开始个数

@Test
public void TakeTest(){
    Observable.just(1,2,3,4,5,6).take(3).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext integer:"+integer);
        }
    });
}
 //  onNext integer:1
  //  onNext integer:2
  //  onNext integer:3
  //  onCompleted

4、 takeLast:限制最后的个数

@Test
public void TakeLastTest(){
    Observable.just(1,2,3,4,5,6).takeLast(3).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext integer:"+integer);
        }
    });
}

   //  onNext integer:4
  //  onNext integer:5
  //  onNext integer:6
  //  onCompleted

5、scan:累加器函数

@Test
public void ScanTest(){
    Observable.just(1,2,3,4,5,6).scan(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            return integer + integer2;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("ScanTest onCompleted");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("ScanTest onNext interger:"+integer);
        }
    });
}

  //  ScanTest onNext interger:1
  //  ScanTest onNext interger:3
  //  ScanTest onNext interger:6
  //  ScanTest onNext interger:10
  //  ScanTest onNext interger:15
  //  ScanTest onNext interger:21
  //  ScanTest onCompleted
  
  //  先输出1,
  //  然后 1 + 2 = 3,输出3
  //  然后 3 +3 = 6;输出 6
  //  然后 4 + 6; 输出10
  //  都是相加上一次的结果

6、map:遍历list中的name,使用map操作符来获取list中的单项

 @Test
 public void test7(){
     Observable.from(Data.getCats(10)).map(new Func1<Cat, String>() {

       @Override
       public String call(Cat cat) {
           return cat.toCat();//获取cat name
       }
   }).subscribe(new Observer<String>() {
       @Override
       public void onCompleted() {
           System.out.println("test7 onCompleted:");
       }

       @Override
       public void onError(Throwable e) {
           System.out.println("test7 onCompleted e:"+e);
       }

       @Override
       public void onNext(String s) {
           System.out.println("test7 onNext :"+s);
       }
   });
 }

 //  test7 onNext :0
 //  test7 onNext :1
 //  test7 onNext :2
 //  test7 onNext :3
 //  test7 onNext :4
 //  test7 onNext :5
 //  test7 onNext :6
 //  test7 onNext :7
 //  test7 onNext :8
 //  test7 onNext :9
 //  test7 onCompleted:

7、flatMap: 循环List<Cat> -> cat里面的list -> list 中的 String

 @Test
 public void test8() {
     //循环List<Cat> -> cat里面的list -> list 中的 String
     Observable
             .from(Data.getCats())
             .flatMap(new Func1<Cat, Observable<String>>() {
                 @Override
                 public Observable<String> call(Cat cat) {
                     System.out.println("test8 call #########" + cat.toCat());
                     return Observable.from(cat.getlist());
                 }
             })
             .subscribe(new Action1<String>() {
                 @Override
                 public void call(String s) {
                     System.out.println("test8 call :" + s);
                 }
     });
 }
//  部分log
//  test8 call #########Cat0
//  test8 call :0:list
//  test8 call :1:list
//  test8 call :2:list
//  test8 call :3:list
//  test8 call :4:list
//  test8 call #########Cat1
//  test8 call :0:list
//  test8 call :1:list
//  test8 call :2:list
//  test8 call :3:list
//  test8 call :4:list
//  test8 call #########Cat2
//  test8 call :0:list
//  test8 call :1:list
//  test8 call :2:list
//  test8 call :3:list
//  test8 call :4:list

五、线程控制使用

以上的测试例子中,事件的发起和消费都是在同一个线程执行的,等同于 RxJava 是同步执行(学习使用),但是,观察者模式本身就是等触发后才通知其他订阅者,所以来了解下 RxJava 如何使用线程的。

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe()
,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

  1. Scheduler 的 API (一)

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

    有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

    ImageView显示图片:

    Observable.just(R.drawable.ic_launcher).map(new Func1<Integer, Drawable>() {

     @Override
     public Drawable call(Integer integer) {
         return getResources().getDrawable(integer);
     }
     }).subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
         .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
     .subscribe(new Observer<Drawable>() {
     @Override
     public void onCompleted() {

     }

     @Override
     public void onError(Throwable e) {
         Toast.makeText(MainActivity.this, "Error!", Toast.LENGTH_SHORT).show();
     }

     @Override
     public void onNext(Drawable drawable) {
         mImageView.setImageDrawable(drawable);
     }
     });
多次切换线程
 Observable.just(1, 2, 3, 4)
 .subscribeOn(Schedulers.newThread()) // 指定 subscribe() 发生在 IO 线程
 .observeOn(Schedulers.io()) // 指定在IO线程处理
 .map(new Func1<Integer, Integer>() {

     @Override
     public Integer call(Integer integer) {
         return integer+1;
     }
 })
 .observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
 .subscribe(new Action1<Integer>() {
     @Override
     public void call(Integer number) {
         Log.d(TAG, "number:" + number);
     }
 });

六、参考

七、小结

致此,我们需要明白和了解以下几点:

  • 观察者模式是如何使用
  • RxJava 中的观察者模式
  • RxJava 基本语法
  • RxJava 操作符学习
  • RxJava 线程控制

练习代码已上传 WFRxJavaDemo,欢迎指正。

相关文章

  • RxJava语法练习

    很早之前就听Rxjava 多么神奇,各种评论都有,说学习坡度高,难等等,于是亲自上手试试,说实话刚开始看到这些用法...

  • RxJava2.X-buffer

    一、RxJava-buffer语法练习 日志 总结 1、buffer(int count, int skip) c...

  • RxJava2.x-timer

    一、RxJava-timer语法 日志 总结 1、timer(long delay, TimeUnit unit)...

  • RxJava学习笔记(二)

    本文是参照吴小龙同学的博客写的RxJava练习 RxJava的世界里,我们有四种角色: Observable(被观...

  • python django开发教程 & 机器学习

    title: python语法练习 参考阮一峰等多个文件用来练习python基本语法 [TOC] import文件...

  • 语法练习

    1019 一.1.どこ 2.なに 3.なん 4.何 5.いつ 6.だれ 二.1.いいえ 今朝は新聞を読ませんで...

  • 语法练习

    1024 今日、私()美智子()二人()デパート()行きました。日曜日です()()、人()沢山()ます。町()大変...

  • 语法练习

    1021 一,1.休みませんでした 2.土曜日は休みです 3.働きません 4.勉强しました 二,1.いつ 2.何番...

  • 语法练习

    语法练习 请把括号里的动词用正确的形式填入空格:be为am; is; are的统称 Passage 1: I th...

  • Retrofit+Rxjava-CallAdapterFacto

    练习地址 FanChael/RxNet 上一篇我们瞄了瞄MonkeyLei:Retrofit+Rxjava-Con...

网友评论

本文标题:RxJava语法练习

本文链接:https://www.haomeiwen.com/subject/dckbuttx.html