美文网首页
十一、RxJava简析

十一、RxJava简析

作者: 小鱼你好 | 来源:发表于2022-10-13 15:05 被阅读0次

    RxJava有4个角色Observable、Observer、Subscriber和Suject,Observable和 Observer 通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。

    RxJava的使用

    1,创建Observer(观察者)

    它决定事件触发的时候有怎样的行为。

    Subscriber subscriber = new Subscriber<String>(){
        @Override
        public void onCompleted(){//事件队列完结,当不再有新的onNext触发时调用作为完成标志
        }
        @Override
        public void onError(){//在事件处理过程中出现异常时会触发同时队列终止
        }
        @Override
        public void onNext(){//将要处理的事件添加到事件队列
        }
        @Override
        public void onstart(){//在事件还未发送前调用可以做一些准备工作
        }
    }
    

    Observer是一个接口,Subscriber是在Observer上进行了扩展,也可以使用Observer创建观察者

    Observer<String> observer = new Observer<String>(){
        @Override
        public void onCompleted(){}
        @Override
        public void onError(){}
        @Override
        public void onNext(){}
    }
    

    2,创建Observable(被观察者)

    它决定了什么时候触发事件以及触发怎样的事件

    Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
        @Override
        public void call(Subscriber<? super String> subscriber){
        subscriber.onNext("123")//调用方法将事件添加到队列
        subscriber.onNext("456")
        subscriber.onCompleted()
        }
    })
    

    //简化写法利用just和from实现

    Observable observable = Observable.just("123","456")//依次调用onNext方法和onCompleted方法。
    String[] words = {"123","456"}
    Observable observable = Observable.from(words)//依次调用onNext方法和onCompleted方法。
    

    3,Subscribe(订阅)

    将观察者和被观察者进行关联observable.subscribe(subscriber)

    RxJava的Subject

    Subject 既可以是一个 Observer 也可以是一个 Observerable,它是连接 Observer 和Observerable的桥梁。 因此,Subject可以被理解为Subject=Observable+Observer
    1,PublishSubject:只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,因此为了防止数据丢失可以在所有观察者都订阅完成后在发送数据。
    2,BehaviorSubject:当Observer订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据。如果此时还没有收到 任何数据,它会发射一个默认值,然后继续发射其他任何来自原始Observable的数据。如果原始的 Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,但是会向Observer传递一个 异常通知。
    3,ReplySubject:不管Observer何时订阅ReplaySubject,ReplaySubject均会发射所有来自原始Observable的数据给 Observer。不同类型的ReplaySubject用于限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete 和onError方法。这可能会导致顺序错乱,并且违反了Observer规则。
    4,AsyncSubject:当Observable完成时,AsyncSubject只会发射来自原始Observable的最后一个数据。如果原始的 Observable 因为发生了错误而终止,AsyncSubject 将不会发射任何数据,但是会向Observer传递一个异常通知。

    RxJava操作符

    包括defer、range、interval、start、repeat、timer

    创建操作符

    1,interval
    创建一个按固定时间间隔发送整数序列的Observable,相当于定时器

    Observable.interval(6,TimeUnit.SECONDS)//间隔6秒发送
                      .subscribe(new Action1<Long>(){
                      @Override
                      public void call(Long mLong){
                          //TODO
                          } 
                      })
    

    2,range
    创建发射指定范围整数序列的Observable,可以用于代替for循环,第一个参数为起始值且不小于0,第二个参数为终值,左闭右开。

    Observable.range(0,8)
                      .subscribe((new Action1<Interger>(){
                      @Override
                      public void call(Integer integer){
                          //TODO
                          } 
                      })
    

    3,repeat
    创建一个N次重复发射特定数据的Observable

    Observable.range(0,8)
                       .repeat(3)//重复执行三次0-7循环
                      .subscribe((new Action1<Integer>(){
                      @Override
                      public void call(Integer integer){
                          //TODO
                          } 
                      })
    

    变换操作符

    包括map、flatMap、cast、concatMap、flatMapIterable、buffer、groupBy
    1,map
    通过指定一个Func对象,将Obserable转换为一个新的Observable对象并发射,观察者将接收到新的Observable处理。例如利用map来处理域名更换:

    final String Host = "http://baidu.com/"
    Observable.just("image").map(new Func1<String,String>(){
        @Override
        public String call(String s){
          return Host+s
        }
    }).subscribe(new Action1<String>(){
                      @Override
                      public void call(String s){
                          //TODO
                          } 
                      })
    

    2,flatMap、cast
    flatMap操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦的放入一个单独的Observable,cast操作符的作用是强制将Observable发射的所有数据转换为指定的类型。例如在多个请求接口前添加host:

    final String Host="http://baidu.com"
    Lsit<String> list = new ArrayList<>()
    list.add("image1")
    list.add("image2")
    list.add("image3")
    //利用flatMap将list转换为Observable集合并再放入一个单独的Observable中发射,交叉执行不保证发射顺序。
    Observable.from(list).flatMap(new Func1<String,Observable<?>>(){
        @Override
        public Observable<?> call(String s){
        return Observable.just(Host+s)
        }
    //cast将Observable的数据转换为String类型
    }).cast(String.class).subscribe(new Action1<String>(){
                      @Override
                      public void call(String s){
                          //TODO
                          } 
                      })
    

    3,concatMap
    concatMap与flatMap操作符一致,解决了flatMap的交叉问题,提供了一种能够把发射值连续在一起的函数,而不合并他们。
    4,flatMapIterable
    可以将数据包装成Iterable,我们在Iterable中对数据进行处理。

    Observable.just(1,2,3).flatMapIterable(new Func1<Integer,Iterable<Integer>>(){
        @Override
        publc Iterable<Integer> call(Integer s){
        List<Integer> mList = new ArrayList<Integer>()
        mList.add(s+1)//对每个数都进行+1,输出为2,3,4
        return mList
        }
    }).subscribe(new Action1<Integer>(){
                      @Override
                      public void call(Integer integer){
                          //TODO
                          } 
                      }
    

    5,buffer
    将原Observable变换为一个新的Observable,新的Observable每次发射一组列表值而不是一个一个发射。和其类似的有window操作符,window操作符发射的是Observable而不是数据列表

    Observable.just(1,2,3,4,5,6)
                      .buffer(3)//缓存容量为3,输出为两组每组3个数分别输出
                      .subscribe(new Action1<List<Integer>>(){
                      @Override
                      public void call(List<Interger> integers){
                          //TODO
                          } 
                      })
    

    6,groupBy
    用于元素分组,将原Observable变换为一个发射Observable的新的分组后的Observable,每一个新的Observable都是发射一组指定数据。

    Observable<GroupedObservable<String,Object>> groupObservable 
    = Observable.just(obj1,obj2,obj3)
                         .groupBy(new Func1<Object,String>){
                         @Override
                         public String call(Object obj){
                          return obj.value//用于确定分组的参数
                         }
                         }
    //对分组后的数据输出
    Observable.concat(groupObservable).subscribe(new Action1<Object>(){
                      @Override
                      public void call(Object obj){
                          //TODO
                          } 
                      })
    

    过滤操作符

    包括filter、elementAt、distinct、skip、take、skipLast、takeLast、ignoreElements、throttleFirst、sample、debounce、throttleWithTimeout

    组合操作符

    包括startWith、merge、concat、zip、combineLastest、join、switch

    辅助操作符

    包括delay、DO、subscribeOn、observeOn、timeout、materialize、dematerialize、timeInterval、timestamp、to

    错误处理操作符

    包括catch、retry

    布尔操作符

    包括all、contains、isEmpty、exists、sequenceEqual

    条件操作符

    包括amb、defaultIfEmpty、skipUntil、skipWhile、takeUnit、takeWhile

    转换操作符

    包括toList、toSortedList、toMap、toMultiMap、getIterator、nest

    RxJava线程控制

    如果不设置线程,默认为在subscribe方法的线程上进行回调,设置线程需要用到Scheduler
    Scheduler.immediate():在当前线程运行
    Scheduler.newThread():总是启用新线程
    Scheduler.io():I/O操作使用的Scheduler,和newThread类似,区别在于io内部实现的是一个无数量上限的线程池,可以重用空闲线程比newThread更有效率。
    Scheduler.computation():计算使用的模式,使用固定线程池大小为cpu核数,不要进行I/O操作其等待时间会浪费cpu资源。
    Scheduler.trampoline():在当前线程非立即执行使用,可以将任务加入队列然后按顺序运行。
    RxAndroid提供的常用的Scheduler
    AndroidSchedulers.mainThread():指定操作在主线程运行。
    在RxJava中用subscribeOn和observeOn操作符来控制线程。

    源码解析

    1,RxJava订阅过程。
    查看一段RxJava的基本使用代码。
    Observable.create(new Observable.OnSubscribe<Integer>)...
    .subscribe(new Subscriber<Integer>)...
    查看create方法的定义

    public static <T> Observable<T> create(OnSubscribe<T> f){
        return new Observable<T>(hook.onCreate(f))
    }
    

    可以看出在create方法中创建了Observable对象并返回,hook表示的是RxJavaObservableExecutionHook。查看他的onCreate方法定义

    public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f){
      return f
    }
    

    在RxJavaObservableExecutionHook的onCreate方法中只是返回了传入得被观察者对象。
    查看Observable的构造方法

    protected Observable(OnSubscribe<T> f){
      this.onSubscribe = f//将前边构建的Observable对象赋值给了onSubscribe
    }
    

    之后调用subscribe方法完成订阅,查看Observable的subscrbie方法

    static <T> Subscription subscribe(Subscriber<? super T> subscriber,Observable<T> observble)
    ...
    subscriber.onStart()
    if(!(subscriber instancefo SafeSubscriber)){//进行类型检查,如果不是进行封装
    //SafeSubscriber继承自Subscriber,在 onCompleted和onError方法调用时不会再调用onNext,且保证onCompleted和onError方法只有一个执行
        subscriber = new SafeSubscriber<T>(subscriber)
    }
    try{
        hook.onSubscribeStrt(observable, observable.onSubscribe).call(subscriber)
        return hook.onSubscribeReturn(subscriber)
    }catch(Throwable e){
        ...
        return Subscriptions.unsubscribed()
    }
    

    查看hook的onSubscribeStart方法可以发现是调用OnSubscribe.call(subscriber)来完成订阅的

    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observable,final OnSubscribe<T> onSubscribe){
        return onSubscribe
    }
    

    2,RxJava的变换过程
    前边说过map操作符会将源Observable转换为一个新的Observable,查看map方法

    public final <R> Observable<R> map(Func1<? super T,? extends R>func){
    //OperatorMap实现了Operator接口的call方法,且在call方法中创建了MapSubscriber并返回
        return lift(new OperatorMap<T,R>(func))
    }
    

    lift 方法返回一个新建的 Observable 对象ob2,并传入了一个 OnSubscribeLift对象记为on2,他的构造函数中需要两个参数为onSubscribe 和 operator(OperatorMap),在OnSubscribeLift构造方法中会拿到开始creat创建的Observable(前边提过的onSubscribe变量on1),在其call方法中调用hook.onLift(operator).call(o)方法即调用的是用OperatorMap的call方法返回MapSubscriber记为sub2,继续执行可理解为on1.call(sub2)完成订阅。在map方法后调用的subscribe方法传入Subscriber类型参数标记为sub1。
    subscribe方法前面讲过,它会调用:hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber) 因为此前调用过map操作符,所以这里传入的observable.onSubscribe指的是on2。分析RxJava的订阅过程,onSubscribeStart方法会返回onSubscribe,也就是on2,相当于on2.call(sub1),on2指的是 OnSubscribeLift,在on1的call中会调用sub2的onNext查看MapSubscriber的onNext方法中调用了actual.onNext(result)方法,actual指的是sub1从而完成了变换过程。


    map转换图

    3,RxJava的线程切换过程
    线程切换主要用到了subscribeOn和observeOn两个方法,一个决定了被观察者执行线程,一个决定了观察者运行线程。

    subscribeOn方法定义:

    public final Observable<T> subscribeOn(Scheduler shceduler){
      if(this instanceof ScalarSynchronousObservable){
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler)
      }
      return create(new OperatorSubscribeOn<T>(this,scheduler))
    }
    

    上面代码create 方法仍旧是生成一个新的 Observable,并传入一个OperatorSubscribeOn 类。 OperatorSubscribeOn 需要传入两个参数:第一个参数 this,指的是我们最先创建的Observable,第二个参数是一个Scheduler。在OperatorSubcribeOn的call方法中调用Scheduler的createWorker方法会创建Worker然后调用它的schedule方法,Worker是线程处理的代理执行者。
    选取查看Schedulers.newThread()代码:

    poublic static Scheduler newThread(){
      return getInstance().newThreadSchedulere
    }
    

    Scheduler 是一个单例类,其返回自身的 newThreadScheduler 属性。这个属性最终指的是 NewThreadScheduler。

    public final class NewThreadScheduler extends Scheduler{
      private final ThreadFactory threadFactory
      public NewThreadScheduler(ThreadFactory threadFactory){
        this.threadFactory = threadFactory
      }
      @Override
       public Worker createeWorker(){
          reurn new NewThreadWorker(threadFactory)
       } 
    }
    

    此前在 OperatorSubscribeOn 中调用了 Scheduler 的 createWorker 方法,其实就是调用 NewThreadScheduler 的createWorker方法。NewThreadWorker中使用了ScheduledThreadPool线程池。OperatorSubscribeOn中调用了 Worker 的 schedule 方法,而 Worker 指的是NewThreadWorker

    public Subscription schedule(final Action0 action,long delayTime,TimeUnit unit){
      if(isUnsubscribed){
        return Subscriptions.unsubscribbed()
      }
      return scheduleActual(action,delayTime,unit)
    }
    //scheduleActual方法
    public ScheduledAction scheduleActual(final Action0 action,long delayTime,TimeUnit unit){
      Action0 decoratedAction = schedulersHook.onSchedule(action)
      ScheduledAction run = new ScheduledAction(decoratedAction)
      Future<?>f
      if(delayTime<=0){
         f=executor.submit(run)
      }else{
        f = executor.schedule(run,delayTime,unit)
      }
      run.add(f)
      return run
    }
    

    可以看到最终线程切换的处理均由线程池处理。

    observeOn方法定义

    public final Observable<T> observeOn(Scheduler scheduler,boolean delayError,int bufferSize){
      if(this instanceof ScalarSynchronousObservable){  
        return ((ScalarSynchronousObservable<T>this).scalarScheduleOn(scheduler)
      }
      return lift(new OperatorObserveOn<T>(scheduler,delayError,bufferSize)
    }
    

    OperatorObserveOn和此前讲过的OperatorMap类似在其call方法中创建了ObserveOnSubscriber

    ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler,child,delayError,bufferSize)
    parent.init()
    return parent
    

    ObserveOnSubscriber是Subscriber的子类,在其onNext,onCompleted,onError方法中都调用了schedule方法。

    protected void schedule(){
        if(counter.getAndIncrement() == 0){
            recursiveScheduler.schedule(this)
        }
    }
    

    recursiveScheduler是一个Worker,this指的是ObserveOnSubscriber,这意味着ObserveOnSubscriber的 onNext方法都被切换到recursiveScheduler的线程做处理,从而达到线程切换的目的。

    相关文章

      网友评论

          本文标题:十一、RxJava简析

          本文链接:https://www.haomeiwen.com/subject/lwatzrtx.html