美文网首页
rxJava学习笔记

rxJava学习笔记

作者: Allen_tong | 来源:发表于2016-11-07 15:06 被阅读55次

    rxJava日常学习

    • 简述
      rxjava基本概念:Observable(被观察者),Observer(观察者),Subscribe(订阅)
      observable和observer通过subscribe()方法实现订阅关系,从而observable可以在需要的时候发出事件来通知observer.
      与传统的回调方法不同的是除了onNext(相当于onClick())之外,还定义了两个特殊事件:onCompleted() 和 onError().

    • onCompleted():事件队列完结。rxjava不仅把每个事件单独处理,还把它们看作是一个队列。rxjava规定,当不会再有新的onNext()发出时,需要触发onCompleted()方法作为完结的标示。

    • onError():事件队列异常。在事件的处理过程中,出现异常时,onError会被触发,同时队列自动终止,不会再有事件发出。

    • 在一个运行的事件序列中,onComplted()和onError()有且只有一个会被调用,而且是事件序列的最后一个。

    • 观察者的创建
      rxjava中观察者observer接口的实现方式为:

    Observer<String> observer = new Observer<String>() {  
      @Override    public void onCompleted() {        
          //end flag    
    }    
    @Override    public void onError(Throwable e) {   
         //report error 
       }    
    @Override    public void onNext(String s) {     
       //do something   
     }};
    

    除了这个接口外,rxjava还内置了一个实现Observer的抽象类:Subscriber。Subscriber对Observer接口进行了一些扩展,但是基本的使用方法一致。

    Subscriber<String> mySubscriber = new Subscriber<String>() {  
      @Override    public void onCompleted() {   
       //end flag  
       }    
      @Override    public void onError(Throwable e) {   
        //report error 
       }   
      @Override    public void onNext(String o) {      
       //do something   
       }};
    

    不仅在使用方式上一样,实质上在Rxjava的subscribe过程中,Observer也总是会先被转为Subscriber再使用。
    源码:

    public final Subscription subscribe(final Observer<? super T> observer) { 
      if (observer instanceof Subscriber) {
         return subscribe((Subscriber<? super T>)observer);
     }
     return subscribe(new Subscriber<T>() { 
        @Override public void onCompleted() { 
          observer.onCompleted(); 
    } 
        @Override public void onError(Throwable e) { 
          observer.onError(e); 
    } 
        @Override public void onNext(T t) { 
        observer.onNext(t); 
    } });
    }
    

    它两的区别主要有两点:
    1.onStart():这是Subscriber增加的方法,主要是在subscribe()刚开始,而事件还未发送前调用,做一些准备工作,例如数据的清零或者重置,一个可选的方法,默认实现为空,如果对线程又要求,onStart()就不太适用,因为它不能指定线程,总是在subscribe发生的线程调用
    2.unsubscribe():这是Subscriber所实现的另一个接口Subscription的方法,用于取消订阅,一般在调用前,可以用isUnsubscribed()先判断一下状态,在合适的地方调用unSubscribe(),来接触引用关系,以避免内存泄露。

    • 创建Observable
      被观察者,它决定什么时候触发事件和触发怎样的事件,rxjava使用create()创建一个observable.
    Observable<String> myObservable = Observable.create(
          new Observable.OnSubscribe<String>() {    
            @Override    
            public void call(Subscriber<? super String> subscriber) {    
             subscriber.onNext("Hello rxjava");       
             subscriber.onCompleted();   
     }});
    

    这里传入了一个OnSubscribe对象作为参数,OnSubscribe会被存储在返回的Observable对象中,它的作用相当于一个计划表,当observable被订阅的时候,onSubscribe的call()方法会自动被调用,事件序列就会依照设定依次出发,这样由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式

    create()方法是Rxjava最基本的创造事件的方法,基于这个方法Rxjava还提供了两个快捷创造事件队列的方法。

    1. just(T...) :将传入的参数依次发出
    2. from(T [] )/from(Iterable<? extends T>) : 将传入的数组或Iterable拆成具体对象后,依次发出。
      subscribe()还支持不完整定义的回调,Rxjava会自动根据定义创建出Subscriber.形式如下:
    Action0 action0 = new Action0() {  
        @Override   
       public void call() {      
        //onComplted   
     }};
    
    Action1<String> action1 = new Action1<String>() {    
      @Override   
       public void call(String s) {    
        //OnNext  
      }} ;
    Action1<Throwable> action11 = new Action1<Throwable>() { 
       @Override    
      public void call(Throwable throwable) {     
       //onError   
     }};
    //自动创建subscriber,并使用action1来定义onNext
    myObservable.subscribe(action1);
    //自动创建subscriber,并使用action11来定义onError
    myObservable.subscribe(action1,action11);
    //自动创建subscriber,并使用action0来定义onComplted
    myObservable.subscribe(action1,action11,action0);
    

    在rxjava的默认规则中,事件的发出和消费都是在同一个线程,观察者模式本身的目的就是:后台处理,前台回调的异步机制,因此异步对Rxjava非常重要,而要实现异步,而要实现异步则需要Rxjava另一个重要概念:Scheduler.

    • 线程控制 Scheduler
    1. Scheduler的API
      1.Schedulers.immediate():默认当前线程
      2.Schedulers.newThread():总是启动新线程,并在新线程执行操作
      3.Schedulers.io(): I/O操作(读写文件,读写数据库,网络信息交互等),行为模式和Schedulers.newThread()差不多,区别在于内置来一个无数量上限等线程池,可以重用空闲线程,因此大多数情况下会使用这个,比newThread()更有效率
      4.AndroidSchedulers.mainThread():它指定操作在Android主线程运行

    有了这几个Scheduler,就可以使用subscribeon()和observeon()两个方法对线程进行控制,subscribeon():指定subscribe()所发生的线程,或者事件产生的线程,Observeon()指定Subscriber所运行的线程,或者叫做事件的消费现场。

    Observable.just(1,2,3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {   
     @Override  
      public void call(Integer integer) {     
       Log.e("number", "number" + integer); 
       }}
    );
    

    这种在subscribe()前加上 subscribeOn() 和 observerOn() 的使用方法,它多适用于 ** 后台线程取数据,主线程显示 ** 的程序策略。

    2016-11-07 立冬

    • 变换
      Rxjava提供了对事件序列进行变换的支持,所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者事件序列。

    example:

    Observable.just("images.logo.png")
    .map(new Func1<String, Bitmap>() { //输入类型 String    
          @Override    
          public Bitmap call(String s) {   //参数类型 String        
            return getBitmapfromPath(s);  //返回类型 bitmap    
            }}
            ).subscribe(new Action1<Bitmap>() {        
        @Override    
          public void call(Bitmap bitmap) { //参数类型bitmap  
                      showBitmap(bitmap);    }}
    );
    

    这里出现的Func1的类,和Action1非常相似,也是Rxjava的一个接口,用于包装含有一个参数的方法。Func1和Action的区别在于,Func1包装的是有返回值的方法,另外,和Actionx一样,Funcx也有多个,用于不同参数个数的方法。

    ** map()是一对一的转换,flatMap()是一对多的转换**

    map example:

    Student[] students = ...;
    Subscriber<String> subscriber = new Subscriber<String>() {       @Override 
    public void onNext(String name) { 
    Log.d(tag, name); } ...
    };
    
    Observable.from(students) .map(new Func1<Student, String>() { 
    @Override public String call(Student student) { 
    return student.getName(); } }
    ) .subscribe(subscriber);
    

    flatMap example:

    Student[] students = ...;
    Subscriber<Course> subscriber = new Subscriber<Course>() { 
      @Override 
      public void onNext(Course course) { 
        Log.d(tag, course.getName()); 
    } ...};
    Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() {
       @Override 
      public Observable<Course> call(Student student) { 
        return Observable.from(student.getCourses()); } }) 
    .subscribe(subscriber);
    

    flapMap()和map()有一个相同点:把传入点参数转化之后返回另一个对象。不同的是,flapMap()中返回的是个observable对象,并且这个observable对象并不是被直接发送到了Subscriber的回调方法中,原理为:1.使用传入的事件对象创建一个observable对象
    2.并不发送这个observable,而是将它激活,于是它开始发送事件
    3.每一个创建出来的observable发送的事件,都被汇入同一个observable,而这个observable负责将这些事件统一交给Subscriber的回调方法

    • 变换的原理: lift()
      这些变化虽然功能上各有不同,但实质上都是针对事件序列的处理和再发送

    它们基于同一个基础的变化方法:lift(Operator)

    • 线程控制:Scheduler(二)
      可以使用observeOn()实现多次线程切换,observeOn()指定的是它之后的操作所在的线程。

    与Subscriber.onStart()相对应,有个方法Observable.doOnSubscribe(),同Subscriber.onStart()同样是在subscribe()调用后而且在事件发送前执行,区别在于它可以指定线程,默认情况下,doOnSubscribe()执行在subscribe()发生的线程,而如果在doOnSubscribe()之后有subscribeOn()的话,它将执行离它最近的subscribeOn()所指定到的线程。

    参考:
    抛物线Rxjava详解

    相关文章

      网友评论

          本文标题:rxJava学习笔记

          本文链接:https://www.haomeiwen.com/subject/gmcguttx.html