美文网首页
RxJava架构设计

RxJava架构设计

作者: 佼佼者Mr | 来源:发表于2020-03-17 21:06 被阅读0次

    RxJava使用系列

    RxJava概念与观察者设计模式

        1.起点 和 终点,一旦满足 起点 和 终点 这样的需求,都可以使用RxJava来实现。

        2.标准中的观察者设计模式,一个被观察者 ---- 多个观察者 多次注册。

        3.RxJava是改装的观察者设计模式,一个订阅(注册) 一个观察者。

    RxJava上游和下游

        1.上游 Observable 被观察者, 下游 Observer 观察者。

        2.ObservableEmitter<Integer> emitter 发射器 发射事件。

        3.拆分来写的,链式调用。

    RxJava创建型操作符

        create:使用者自己发射事件

        just 内部自己发射的,单一对象

        fromArray 内部自己发射的,数集对象

        empty:内部自己发射的 ,下游默认是Object,无法发出有值事件,只会发射 onComplete

        range:内部自己发射的,start 1 累加  count 5    最后结果:1 2 3 4 5

    RxJava变换型操作符

    上游 -------> 变换操作(往右边流向的时候,进行变换) ----------> 下游

        map 

        flatMap

        concatMap

        groupBy

        buffer

    RxJava过滤性型操作符

        filter

        take 定时器 

        distinct 过滤重复的元素

        elementAT   指定下标输出

    RxJava条件性型操作符

        all 如同if 全部为true 才是true,只要有一个false,就是false

        contains 包含

        any 同all相反,全部为false才是false,有一个为true就是true

    RxJava合并性型操作符

        startWith 两个给被观察者合并,先执行后边的被观察者

        concatWith 两个给被观察者合并,和startWith相反 

        concat 多个合并成一个 最多四个 按照顺序执行

        merge 多个合并成一个 最多四个 并列执行,并发

        zip 需要对应关系,对应执行,如果不对应会被忽略,最多九个被观察者

    RxJava异常处理操作符

    onErrorReturn 能够接受e.onError 接受异常后,中断接受上游的后续发射的事情 作用 处理记录异常 通知给下一层

    onErrorResumeNext  能够接受e.onError 接受异常后,也可以返回异常标识,返回被观察者,被观察者可以再次发射事件给下游

    onExceptionResumeNext   能在发生异常的时候(throw Exception)保持程序的健壮,(这种异常一定是可以接受的,才能使用)

    retry 重试

    RxJava线程切换

    异步线程区域 上游

    Schedulers.io() 代表io流操作 网络操作 耗时操作

    Schedulers.Thread() 比较常规的,普普通通

    Schedulers.Comptation()  代表CPU大量计算所需要的线程

    给上游分配多次,只会在第一次切换,后面就不切换了(忽略)

    下游

    AndroidSchedulers.mainThread()  专门为Android main线程量身定做 

    给下游切换,每次都会去切换

    如果不配置异步线程,上游发一次,下游接受一次

    如果配置异步线程,就是异步的表现

    传统的写法四分五裂,RxJava基于事件流编程,一条线索下来

    RxJava背压模式

    Flowable 背压模式 什么情况下使用     如果没有背压模式,上游不停发射线索,下游处理不过来,就会造成内存泄漏

    策略

    1.Error 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,抛出异常

    2.Buffer 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,等待下游来处理

    3.Drop 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,把后面的丢弃

    4.Latest 上游不停发射消息,下游阻塞,放入缓存池,如果池子满了,只存储128个事件

    如果是同步的,下游没有调用request(),会抛异常,而如果是异步的,不执行request(),上游不会等待下游,不会抛出异常,所以外界可以调用

    RxJava背压模式  Flowable 讲解

    Observable---Observer

    Flowable---Subscriber

    c的设计和Flowable一致的,只不过Flowable增加了背压模式

    Flowable下游,Subscription可以传递给外部使用,request(),事件给下游使用

    RxJava+Retrofit(封装了OkHttp)

    Retrofit请求网络

    Retrofit返回一个结果---Observable

    最终的结果是RxJava中的被观察者 上游Observable

    RxJava泛型高级进阶

    由于RxJava大量使用泛型 

    限定   通配符 ? 上限 extends 

                              下限 super

    读写模式  上限  不可写  可读

                     下限  可写   不完全可读

    RxJava手写系列

    RxJava书写create操作符

    源码

    public class Observable {

    public Observable(ObservableOnSubscibe subscibe) {

    this.subscibe = subscibe;

        }

    private ObservableOnSubscibesubscibe;

        public static Observablecreat(ObservableOnSubscibe subscibe) {

    return new Observable(subscibe);

        }

    public void subscibe(Observer observer){

            observer.onSubscibe();

            subscibe.subscribe(observer);

        }//调用onSubscibe订阅成功,将observer传入subscibe,将执行subscibe里面的方法

    }

    public interface ObservableOnSubscibe {

    public void subscribe(Observer observer);

    }

    public interface Observer {

    public void onSubscibe();

        public void onNext(T item);

        public void onError(Throwable e);

        public void onComplete();

    }

    RxJava书写create增加读写操作符

    什么时候是读写模式  什么时候是上限、下限模式

    在方法参数中,show(Test<? extends Object>) 一定是上限和下限

     在真正使用到泛型中,一定是读写模式

    RxJava书写just操作符

    just操作符思路和流程和create是完全一致的,使用上的区别主要是create是用户自己发射,自己调用onNext(),而just是内部自己发射。

    RxJava书写map操作符

    public class Observable {

    public Observable(ObservableOnSubscibe subscibe) {

    this.subscibe = subscibe;

        }

    private ObservableOnSubscibesubscibe;

        public static Observablecreat(ObservableOnSubscibe subscibe) {

    return new Observable(subscibe);

        }

    public void subscibe(Observer observer){

    observer.onSubscibe();

            subscibe.subscribe(observer);

        }

    public Observable map(Function function){

    MapObservable mapObservable=new MapObservable(function,subscibe);

            return new Observable(mapObservable);

        }//添加map方法,链式调用返回Observable

    }

    public class MapObservableimplements ObservableOnSubscibe {

    private Functionfunction;

        private ObservableOnSubscibesource;//管理上一层

        private ObserverobserverEmitter;//管理下层

        public MapObservable() {

    }

    @Override

        public void subscribe(Observer observerEmitter) {

    this.observerEmitter = observerEmitter;

            MapObserver mapObservable =new MapObserver(observerEmitter, source, function);

            source.subscribe(mapObservable);

        }

    public MapObservable(Function function, ObservableOnSubscibe source) {

    Log.e("fsfsgsgs","MapObservable");

            this.function = function;

            this.source = source;

        }

    class MapObserverimplements Observer {

    private ObserverobserverEmitter;//管理下层

            private ObservableOnSubscibesource;//管理上一层

            private Functionfunction;

            public MapObserver(Observer observerEmitter, ObservableOnSubscibe source, Function function) {

    this.observerEmitter = observerEmitter;

                this.source = source;

                this.function = function;

            }

    @Override

            public void onSubscibe() {

    observerEmitter.onSubscibe();

            }

    @Override

            public void onNext(T item) {

    R type =function.apply(item);

                observerEmitter.onNext(type);

            }

    @Override

            public void onError(Throwable e) {

    observerEmitter.onError(e);

            }

    @Override

            public void onComplete() {

    observerEmitter.onComplete();

            }

    }

    }

    public interface Function {

    public R apply(T t);

    }

    相关文章

      网友评论

          本文标题:RxJava架构设计

          本文链接:https://www.haomeiwen.com/subject/teslkhtx.html