美文网首页
Rxjava 基础原理之总结

Rxjava 基础原理之总结

作者: Cheep | 来源:发表于2019-02-28 16:11 被阅读0次

简述:

a library for composing asynchronous and event-based programs using obaservable for the
Java VM (一个对于构成使用的Java虚拟机观察序列号异步和基于事件的程序库)。

总结:随着程序逻辑变得越来越复杂,它依然能够保持简洁。

RxJava引入的目的:异步,代码更清晰

优点:采用观察者模式链式调用,简洁明了,以往实现子线程主线程切换需自己手动new Thread(推送线程池),
并且线程之间还需要使用handler进行通信,Rxjava一步到位,极其简单。

1.基础概念:

Observable(əbˈzɜ:vəbl):在观察者模式中称为“被观察者”

Observer(əbˈzɜ:və(r)):观察者模式中的“观察者”,可接收Observeable发送的数据

subscribe(səbˈskraɪb):订阅,观察者与被观察者,通过subscribe()方法进行订阅

subscriber(səbˈskraɪbə(r)):也是一种观察者,在2.0中它与Observer没什么实质的区别,不同的是Subscriber与Flowable联合使用

Flowable(f'laʊəbl):也是悲观餐者的一种,与Subscriber进行配合使用,实现背压操作

RxJava的异步实现方式:
    让Observable开启子线程执行耗时操作,完成耗时操作后,触发回调,通知Observer进行主线程UI更新。
    如此轻松便可以实现Android中的异步,且代码简洁明了,集中分布。
    RxJava中默认Observer和Observable都在同一线程执行任务。

RxJava的使用Action

2.Rxjava常用操作符

from()操作符:

接受数组或集合,返回一个按参数列表顺序发射这些数据的Observable。
源码:
public final static <T> Observable<T> from(Iterable<? extents T> iterable){
    return create(new OnSubscribeFromIterable<T>(iterable));
}
例如:
String[] array = {"Amy","Rookie","MLXG"};
Observable.from(array)
    .subscribe(new Observer<String>(){
        ...
    });

just()操作符:

接受1-9个参数,它们还可以是不同类型,返回一个按参数列表顺序发射这些数据的Observable。
例如:
    Observable.just(1,2.4,"adb")
        .subscribe(new Action1<String>(){
            ...
        });

map()操作符:

把原来的Observable对象转换成另一个Observable对象,方便Observer获得想要的数据形式,一对一 
列如:
    Observable.just("images/logo.png")              //输入类型 String
        .map(new Func1<String,Bitmap>(){
            @Verride
            public Bitmap call(String filePath){    //参数类型 String
                return getBitmapFromPath(filePath); //返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>(){
            @Override
            public void call(Bitmap bitmap){        //参数类型 bitmap
                showBitmap(bitmap)
            }
        });

flatMap()操作符:

返回任何它想返回的Observable对象,一对多 
列如:
    Student[] students = ...?;
    Subscriber<Course> subscriber = new Subscriber<Course>(){
        @Override
        public void onNext(Course course){
            ...
        }
    };
    Observable.from(students)
        .flatMap(new Func1<Student,Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
    })
    .subscribe(subscriber);

filter()操作符:

Func中对每项元素进行过滤处理,满足条件的元素才会继续发送,下面的过滤偶数。
列如:
    Observable.just(2,3,23,54,15)
        .filter(new Func1<Integer,Boolean>() {
            @Override
            public Boolean call(Integer integer){
                return integer % 2 == 0;
            }
        })
        .subscribe(new Observer<Integer>(){
            @Override
            public void onNext(Integer integer){
                ...
            }
            ...
        });

take()操作符:

输出最多指定数量的结果
列如:
    Observable.just(1,2,3,4)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .take(3)    //只发送前三个事件 
        ...

doOnNext()操作符:

用来在观察者Observer.onNext()方法调用之前进行一些初始化操作,保存/缓存网络结果
例如:
    Observable.just(1,2,3)
        .doOnNext(new Action1<Integer>(){
            @Override
            public void call(Integer integer){
                ...
            }
        })
        ...

Merge()操作符:

合并多个Observable,按照加入的Observable顺序将各个元素传递。
例如:
    Observable<Integer> obserable1 = Observable.just(2,12,34,32);
    Observable<Integer> obserable2 = Observable.just(32,12,43,2);
    Observable.merge(observable1,observable2)
            .subscribe(new Observer<Integer>(){
        ...
    });

zip()操作符:

将各个Observable个对应位置各个元素取出做操作,然后将结果传递。
例如:
    Observable<Integer> observable1 = Observable.just(1,2,3);
    Observable<Integer> observable2 = Observable.just(11,22);
    Observable.zip(observable1,observable2,newFunc2<Integer,Integer,Integer>(){
        @Override
        public Integer call(Integer integer1,Integer integer){
            return integer1+integer2;
        }   
    })
    .subscribe(new Observable<Integer>(){
        ...
        @Override
        public void onNext(Integer integer){
            //out 12、24、3
        }
    });

3.Scheduler(调度号)切换线程

Schedulers.immediate(): 
    直接在当前线程运行,相当于不指定线程,默认

Schedulers.newThread():
    总是启动新线程,并在新线程操作

Schedulers.io():
    用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;
    对于普通的计算任务,请使用Schedulers.computation();
    Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。

Schedulers.computation():
    用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量       
Schedulers.trampoline():
    当其它排队的任务完成后,在当前线程排队开始执行。

SubscribeOn\ObserveOn

subscribeOn():
    指定Observable(被观察者)所在的线程,或叫做事件产生的线程。
observeOn():
    指定Observer(观察者)所运行在的线程,或叫做事件消费的线程。

4.Fowable与Subscriber

当被观察者发射数据的速度大于观察者接收处理数据的速度,造成观察者的调度器中数据缓冲池无限堆积,
超出了缓冲池的最大容量,导致OOM.
例如:
Observable.create(new ObservableOnSubscribe<String>(){
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception{
            int a = 0;
            while(true){
                e.onNext("data:"+(i++));
            }
        }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())  
        .subscribe(new Consumer<String>(){
            @Override
            public void accept(String s) throws Exception{
            Thread.sleep(2000);
            print(s);
        }
    });

而此时Flowable的背压策略就很好的解决这个问题.
例如:

    Flowable.create(new FlowableOnSubscribe<String>(){
            @Override
            public void subcribe(@NonNull FlowableEmitter<String> e) throws Exception{
                int i = 0;
                while(true){
                    e.onNext("data:"+(i++));
                }
            }
        },BackpressureStrategy.DROP)    //超出缓冲池的数据丢弃
            .subecribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>(){
                Subscription subscription;
                @Override
                public void onSubsrcibe(Subscription s){
                    subscription = s;
                    subscription.request(1);
                }
                @Override
                public void onNext(String s){
                    try{
                        Thread.sleep(2000);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                    printThred(2);
                    subscription.request(1);    //处理完了,再请求数据
                }
                
                ...
        });

该背压策略是超出缓冲池的数据被丢弃,而观察者要求处理一个 发送我一个数据。

Backpressure的策略

a.被观察者和观察者在异步线程的情况下,如果被观察者发射事件的速度大于
  观察者接收事件的速度,就会产生Backpressure问题。
  但是同步情况下,Backpressure问题不会存在。

b.Backpressure的策略仅仅是调度Subscriber接收事件,并不影响Flowable
  发送事件。观察者可以根据自身实际情况按需拉取数据,而不是被动接收。
  最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。

c.Backpressure的策略有5种:ERROR,BUFFER,DROP,LATEST,MISSING

ERROR:

用来缓存观察者处理不了暂时缓存下来的数据,缓冲池的默认大小为128,即只能缓存128个事件。
如果缓冲池溢出,就会立刻抛出MissingBackpressureException异常。

BUFFER:

即把默认容器为128的缓存池成一个大的缓存池,支持很多的数据,这种方式
比较消耗内存。

DROP:

当消费者处理不了的时候就丢弃,消费者通过request传入其需求n(事件个数),
然后生产着把n个事件传递给消费者供其消费,其他消费不掉的丢弃。

LATEST:

基本和DROP一致,消费者通过request传入需求n,然后生产者把n个事
件传递给消费者供其消费,其他消费不掉的事件就丢弃。
唯一区别是LATEST总能使消费者能够接收到生产者产生的最好一个事件。

MISSING:

没有缓冲池,接收第一个数据之后,后面的都丢弃。

相关文章

  • Rxjava 基础原理之总结

    简述: RxJava引入的目的:异步,代码更清晰 1.基础概念: RxJava的使用Action 2.Rxjava...

  • Android面试复习

    想到什么就记什么吧 java基础篇 HashMap实现原理及源码分析 RXjava RXJava的好处:(1)简洁...

  • RxJava详解之执行原理(四)

    RxJava详解之执行原理(四) 前面几篇文章介绍了RxJava的基本使用,也说了RxJava的优缺点。下面我们就...

  • RxJava详解之操作符执行原理(五)

    RxJava详解之操作符执行原理(五) 上一篇文章介绍了RxJava的执行原理。下面继续介绍一下操作符的执行原理,...

  • Hystrix Command执行以及熔断机制整理

    这几天在看Hystrix的一些实现,里面大量运用了rxjava的原理,将代码简化到了极致,对于有rxjava基础的...

  • RxJava基础总结

    1.本文仅基于RxJava2.0、Retrofit2.0(引入背压) 当下Rxjava,Retrofit已成项目标...

  • 三大框架原理

    RxJava原理可总结为: 被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 ...

  • RxJava——基础学习(三),简单实践

    RxJava——基础学习(一)RxJava——基础学习(二)前两篇基础学习,了解了RxJava最基础的部分知识。这...

  • RxJava2初探

    1.RxJava概念及原理 RxJava – Reactive Extensions for the JVM – ...

  • Retrofit 基础原理之总结

    简述: 1.网络请求: 2.网络请求流程梳理: 上述2角色解析 3.网络请求适配器: 4.网络请求执行器: 5.数...

网友评论

      本文标题:Rxjava 基础原理之总结

      本文链接:https://www.haomeiwen.com/subject/ktohuqtx.html