美文网首页
RxJava2.0初学知识回顾

RxJava2.0初学知识回顾

作者: 张正yi | 来源:发表于2018-01-15 11:31 被阅读0次

    1、RxJava的重要组成

    Observable (可观察者,即被观察者)、Observer (观察者)、subscribe (订阅)、事件。
    Observable的生命周期中有三个重要的事件,onNext(检索数据)、onCompleted(完成)和onError(发现错误)。

    Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用。

    Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

    RxJava三部曲:初始化一个Observable,初始化一个Observer,建立订阅关系

    1.1 Observable创建

    Observable<T> create(ObservableOnSubscribe<T> source)

    代码效果如下:

       @Test
        public void testObservable() {
            System.out.println("---------testObservale----------");
            // RxJava1的写法,本次使用的是Rxjava2依赖
            //Observable<Integer> observable = Observable.create(new Observable.Onsubscrible<Integer>(){});
            //创建一个Observable
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                 // 创建Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext)和通知(onError/onComplete)
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    for (int i = 0; i < 5; i++) {
                        e.onNext(i);
                    }
                    e.onComplete();
                }
            });
    
            //创建一个Observer
            Observer<Integer> observer = new Observer<Integer>() {
                /**
                * RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于        
                * RxJava1.x中的Subscription,用于解除订阅。
                 */
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("---------onSubscribe:" + d);
                }
    
                @Override
                public void onNext(Integer integer) {
                    // 收到数据
                    System.out.println("---------onNext:" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("---------onError----------");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("---------onComplete----------");
                }
            };
    
            // 建立连接(订阅)
            observable.subscribe(observer);
        }
       运行结果如下:
        ---------testObservale----------
        ---------onSubscribe:null
        ---------onNext:0
        ---------onNext:1
        ---------onNext:2
        ---------onNext:3
        ---------onNext:4
        ---------onComplete----------
    

    1.2 Observable的其他创建方式

    1.2.1 fromIterable()

    使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

    代码效果如下:

       @Test
        public void testObservable2() {
            List<String> lists = new ArrayList<String>();
            for (int i = 0; i < 5; i++) {
                lists.add("Hello Boy" + i);
            }
            Observable observable = Observable.fromIterable((Iterable<String>) lists);
            //创建一个Observer
            Observer<String> observer = new Observer<String>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                }
                @Override
                public void onNext(String str) {
                    System.out.println("---------onNext:" + str);
                }
                @Override
                public void onError(Throwable e) {
                    System.out.println("---------onError----------");
                }
                @Override
                public void onComplete() {
                    System.out.println("---------onComplete----------");
                }
            };
            // 建立连接(订阅)
            observable.subscribe(observer);
        }
        运行结果如下:
         ---------onNext:Hello Boy0
         ---------onNext:Hello Boy1
         ---------onNext:Hello Boy2
         ---------onNext:Hello Boy3
        ---------onNext:Hello Boy4
        ---------onComplete----------
    

    1.2.2 just()

    使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。
    通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。

    代码实现如下:

       @Test
        public void testObservable3() {
            List<String> lists = new ArrayList<String>();
            for (int i = 0; i < 5; i++) {
                lists.add("Hello Boy" + i);
            }
            Observable observable = Observable.just(lists);
            //创建一个Observer
            Observer<List<String>> observer = new Observer<List<String>>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println(d);
                }
    
                @Override
                public void onNext(List<String> str) {
                    System.out.println("---------onNext:" + str);
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("---------onError----------");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("---------onComplete----------");
                }
            };
            // 建立连接(订阅)
            observable.subscribe(observer);
        }
        // 运行结果如下:
         Disposable:0
          ---------onNext:[Hello Boy0, Hello Boy1, Hello Boy2, Hello Boy3, Hello Boy4]
          ---------onComplete----------
    

    1.2.3 其它

    defer():当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。

    interval( ):固定的时间间隔发射一个无限递增的整数序列

    range( ):创建一个发射特定整数序列的Observable,接收两个参数,第一个参数是范围的起始值,第二个参数是范围的数据数目

    timer( ):创建一个Observable,它在一个给定的延迟后发射一个特殊的值

    repeat( ):创建一个Observable,该Observable的事件可以重复调用。

    2、RxJava中的操作符

    2.1、map()操作符

    map 操作符是可以将返回的数据变换成别的数据。一般对服务器端返回结果处理

    具体看代码实现效果:

      @Test
        public void testObservable4() {
            List<String> lists = new ArrayList<String>();
            for (int i = 0; i < 5; i++) {
                lists.add("Hello Boy" + i);
            }
            Observable observable = Observable.fromIterable((Iterable<String>) lists);
            //创建一个Observer
            Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("Disposable:" + d);
                }
                @Override
                public void onNext(String str) {
                    System.out.println("---------onNext:" + str);
                }
                @Override
                public void onError(Throwable e) {
                    System.out.println("---------onError----------");
                }
                @Override
                public void onComplete() {
                    System.out.println("---------onComplete----------");
                }
            };
            Function fun1 = new Function<String, String>() {
                @Override
                public String apply(String s) throws Exception {
                    // 这个可以对返回的数据格式进行修改(比如我在每个前面加入"zyzhang")
                    return "zyzhang:" + s;
                }
            };
            // 建立连接(订阅)
            observable.map(fun1).subscribe(observer);
        }
        // 运行结果如下:
        ---------onNext:zyzhang:Hello Boy0
        ---------onNext:zyzhang:Hello Boy1
        ---------onNext:zyzhang:Hello Boy2
        ---------onNext:zyzhang:Hello Boy3
        ---------onNext:zyzhang:Hello Boy4
        ---------onComplete----------
    

    2.2、flatMap()操作符

    对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。

    比如有如下两个实体类:

        class Student {
            private String name;//姓名
            private List<Course> coursesList;//所修的课程
            ...
        }
        class  Course {
            private String name;//课程名
            private String id;
            ...
        }
    

    我们需要学生所修的课程名(课程可以由多个)。如果直接用map实现的话,很简单得到学生的课程列表,然后一个for循环打印学生的课程信息。篇幅关系不再举例。
    flatMap实现如下:

      @Test
        public void testObservable5() {
            List<Student> students = new ArrayList<Student>();
            for (int i = 0; i < 5; i++) {
                Student student = new Student();
                student.setName("张三" + i);
                List<Course> courses = new ArrayList<Course>();
                for (int j = 0; j < 2; j++) {
                    Course course = new Course();
                    course.setName("数学" + j);
                    courses.add(course);
                    student.setCoursesList(courses);
                }
                students.add(student);
            }
    
            Observable observable = Observable.fromIterable(students);
            //创建一个Observer
            Observer<Course> observer = new Observer<Course>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println(d);
                }
    
                @Override
                public void onNext(Course course) {
                    System.out.println("---------onNext:" + course.getName());
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("---------onError----------");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("---------onComplete----------");
                }
            };
    
            Function fun1 = new Function<Student, Observable<Course>>() {
    
                @Override
                public Observable<Course> apply(Student student) throws Exception {
                    System.out.println(student.getName());
                    return Observable.fromIterable(student.getCoursesList());
                }
            };
    
            // 建立连接(订阅)
            observable.flatMap(fun1).subscribe(observer);
        }
      // 运行结果如下:
      张三0
      ---------onNext:数学0
      ---------onNext:数学1
      张三1
      ---------onNext:数学0
      ---------onNext:数学1
      张三2
      ---------onNext:数学0
      ---------onNext:数学1
      张三3
      ---------onNext:数学0
      ---------onNext:数学1
      张三4
      ---------onNext:数学0
      ---------onNext:数学1
      ---------onComplete----------
    

    2.3、filter()操作符

    集合进行过滤

    2.4、take()操作符

    取出集合中的前几个

    2.5、doOnNext()操作符

    此操作符可以在消费者也就是观察者 接收到数据之前做事

    3、compose()操作符

    4、subscribeOn() 和observeOn()

    对线程进行控制,subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。

      Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d("所在的线程:",Thread.currentThread().getName());
                Log.d("发送的数据:", 1+"");
                e.onNext(1);
            }
         }).subscribeOn(Schedulers.io()) 
              .observeOn(AndroidSchedulers.mainThread()) 
              .subscribe(new Consumer<Integer>() {
                   @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("所在的线程:",Thread.currentThread().getName());
                        Log.d("接收到的数据:", "integer:" + integer);
                    }
               });
    // 运行结果
    // 所在的线程:RxCachedThreadScheduler-1
    // 发送的数据:1
    // 所在的线程: main
    // 接收到的数据:integer:1
    

    可以看到,Observable(被观察者)发送事件的线程的确改变了,而Observer(观察者)仍然在主线程中接收事件。由此我们实现了线程调度的操作,可以在此基础上尽情的进行异步操作。

    相关文章

      网友评论

          本文标题:RxJava2.0初学知识回顾

          本文链接:https://www.haomeiwen.com/subject/cbjroxtx.html