美文网首页
RxJava点点滴滴

RxJava点点滴滴

作者: kevinsEegets | 来源:发表于2020-01-13 15:17 被阅读0次

响应式编程是一种关注数据流和变化传递的异步编程方式

什么是数据流?

数据流被称为流处理
传统上的程序指的是按照特定的顺序执行的一系列操作,通常称为控制流
数据流编程强调数据的传递,并将程序构建为一系列的连接。数据库,用户输入,数组,网络请求都属于数据流

什么是变化传播?

一种数据流输入,经过一系列的操作之后转换为另外一个数据流,然后再分发给各个订阅者

什么是异步编程?

传统的编程方式是顺序执行的,必须在完成了上一个任务之后才能执行下一个任务。
比如我们有五个接口,个别接口有相互依赖关系,我们以前的做法就是顺序执行,一个一个执行完毕之后最 后才进行UI界面刷新,这样导致了程序依赖性大大增强并且程序可读性很差“,我们想让其响应速度升级应该 怎么做?我们可以把同步执行换成异步执行,这样的优点是,当你的任务不依赖彼此时,我们可以同时启动多 个任务,而不是等到执行完一个任务再执行下一个

什么是RxJava?

  • Reactive Extensions
  • 观察者模式
  • 迭代器模式(聚合对象,不触碰对象本身,游标(Cursor)模式)
  • 函数式编程

为什么要用RxJava?

  • 线程方便切换
  • 链式调用更易于代码阅读
  • 使用Lambda表达式简化代码量
  • 解耦单一,不嵌套
  • 强大的操作符

我们看一个经典的例子

在子线程中把某个文件夹里面的以 png 结尾的图片文件解析出来,交给 UI 线程进行渲染

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

我们用RxJava修改一下上述代码,如下

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

我们从最基本开始看看

val observable = Observable.create(object :ObservableOnSubscribe<Int> {
    override fun subscribe(it: ObservableEmitter<Int>?) {
        it?.onNext(1)
        it?.onNext(2)
        it?.onComplete()
    }
})
val observer = object: Observer<Int> {
    override fun onComplete() {
        debugMsg("onComplete")
    }
    override fun onSubscribe(d: Disposable?) {
        debugMsg("onSubscribe")
    }
    override fun onNext(value: Int?) {
        debugMsg("==",value)
    }
    override fun onError(e: Throwable?) {
        debugMsg("onError")
    }
}
observable.subscribe(observer)

RxJava创建操作符

create 从头创建一个Observable

from 将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable

just 将一个或多个对象转换成发射这个或这些对象的一个Observable

timer 创建一个在给定的延时之后发射数据项为0的Observable<Long>

empty 创建一个什么都不做直接通知完成的Observable

error 创建一个什么都不做直接通知错误的Observable

never 创建一个什么都不做的Observable

interval 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>

range 创建一个发射指定范围的整数序列的Observable<Integer>

defer 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable

create操作符
Observable.create(ObservableOnSubscribe<Int?> { emitter ->
            emitter.onNext(1)
            emitter.onNext(2)
            emitter.onNext(3)
        }).map(Function<Int?, String?> { integer -> "This is new result $integer" })
            .subscribe { t -> debugMsg("msg--->$t") }
empty, error, never操作符
Observable observable1=Observable.empty();//直接调用onCompleted。
 
Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常

Observable observable3=Observable.never();//啥都不做
timer操作符 实现一个延时操作,类似Handler postDelay
Observable.timer(3000, TimeUnit.MILLISECONDS)
            .subscribe(object : Consumer<Long?> {
                override fun accept(t: Long?) {
                   debugMsg("msg--->$t")
                }
            })
interval操作符 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(object : Consumer<Long?> {
                override fun accept(t: Long?) {
                    debugMsg("msg--->$t")
                }
            })
//打印
.MainActivity$rxjavaInterval$1;msg--->0
.MainActivity$rxjavaInterval$1;msg--->1
.MainActivity$rxjavaInterval$1;msg--->2
.MainActivity$rxjavaInterval$1;msg--->3
.MainActivity$rxjavaInterval$1;msg--->4
.MainActivity$rxjavaInterval$1;msg--->5

过滤操作符

skip 跳过操作

filter 过滤数据

ofType 过滤指定类型的数据

take 只发射开始的N项数据或者一定时间内的数据

takeFirst 提取满足条件的第一项

takeLast 只发射最后的N项数据或者一定时间内的数据

throttleFirst 定期发射Observable发射的第一项数据

debounce 去抖动

distinct 过滤重复数据

distinctUntilChanged 过滤掉连续重复的数据

timeout 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable

debounce 去抖动过滤操作符 可作用于Flowable,Observable。在Android开发,通常为了防止用户重复点击而设置标记位,而通过RxJava的debounce操作符可以有效达到该效果。在规定时间内,用户重复点击只有最后一次有效
Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(1_500);
    emitter.onNext("B");

    Thread.sleep(500);
    emitter.onNext("C");

    Thread.sleep(250);
    emitter.onNext("D");

    Thread.sleep(2_000);
    emitter.onNext("E");
    emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
        .debounce(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.println("onComplete"));

打印:A D E onComplete

上文代码中,数据源以一定的时间间隔发送A,B,C,D,E。操作符debounce的时间设为1秒,发送A后1.5秒并没有发射其他数据,所以A能成功发射。发射B后,在1秒之内,又发射了C和D,在D之后的2秒才发射E,所有B、C都失效,只有D有效;而E之后已经没有其他数据流了,所有E有效。

blockingSubscribe可以阻塞当前线程,等待执行完成(https://stackoom.com/question/3f7BX/Rxjava-blockingSubscribe-vs-subscribe

take过滤操作符
   Observable.just(2, 3, 4, "5", 6)
            .take(4)
            .subscribe{
                debugMsg("msg--->$it")
            }     
//打印                                                      
MainActivity;msg--->2
MainActivity;msg--->3
MainActivity;msg--->4
MainActivity;msg--->5                                                     
distinct过滤操作符
Observable.just(2, 3, 4, 5, 4, 3, 6)
            .distinct ()
            .subscribe {
                debugMsg("msg--->$it")
            }                                                     
//打印
MainActivity;msg--->2
MainActivity;msg--->3
MainActivity;msg--->4
MainActivity;msg--->5
MainActivity;msg--->6                                           

变换操作符

map 上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化

cast 在发射之前强制将Observable发射的所有数据转换为指定类型

flatMap 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并

flatMapIterable 和flatMap的作用一样,只不过生成的是Iterable而不是Observable

concatMap 类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射

scan 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。

groupBy 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。

buffer 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个

window 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。

map变换操作符 上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化

image.png
        Observable.create(ObservableOnSubscribe<Int?> { emitter ->
            emitter.onNext(1)
            emitter.onNext(2)
            emitter.onNext(3)
        }).map(Function<Int?, String?> { integer -> "This is new result $integer" })
            .subscribe { t -> debugMsg("msg--->$t") }
//打印
MainActivity;msg--->This is new result 1
MainActivity;msg--->This is new result 2
MainActivity;msg--->This is new result 3 

flatMap变换操作符 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并

image.png
  Observable.create(ObservableOnSubscribe<Int?> { emitter ->
            emitter.onNext(1)
            emitter.onNext(2)
            emitter.onNext(3)
        }).flatMap(object : Function<Int?, ObservableSource<String?>> {
            override fun apply(t: Int): ObservableSource<String?> {
                val list: MutableList<String> = ArrayList()
                for (i in 0..2) {
                    list.add("I am flatMap value $t")
                }
                return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS)
            }

        }).subscribe { t -> debugMsg("msg--->$t") }
//打印
MainActivity;msg--->I am flatMap value 2
MainActivity;msg--->I am flatMap value 2
MainActivity;msg--->I am flatMap value 3
MainActivity;msg--->I am flatMap value 3
MainActivity;msg--->I am flatMap value 1
MainActivity;msg--->I am flatMap value 1

错误 / 重复操作符

onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable

onExceptionResumeNext 当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常

onErrorReturn 当原始Observable在遇到错误时发射一个特定的数据

onErrorReturnItem 当原始Observable在遇到错误时发射一个特定的数据

retry 当原始Observable在遇到错误时进行重试。

retryWhen 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable

onErrorResumeNext操作符 当原始Observable在遇到错误时,使用备用Observable
image.png
val list = arrayOf(1, 2, 3, 4, 5)
        Observable.create(ObservableOnSubscribe<Int> { emitter ->
            list.forEach {
                if (it == 3) emitter.onError(Throwable())
                else emitter.onNext(it)
            }
        }).onErrorResumeNext(Function<Throwable, ObservableSource<Int?>> { throwable ->
                //拦截到错误之后,重新定义了被观察者
                Observable.just(100)
            })
            .subscribe{
                debugMsg("msg--->$it")
            }
//打印
MainActivity;msg--->1
MainActivity;msg--->2
MainActivity;msg--->100
onErrorReturnItem操作符 当原始Observable在遇到错误时发射一个特定的数据
val list = arrayOf(1, 2, 3, 4, 5)
        Observable.create(ObservableOnSubscribe<Int> { emitter ->
            list.forEach {
                if (it == 3) emitter.onError(Throwable())
                else emitter.onNext(it)
            }
        }).onErrorReturnItem(100)
            .subscribe{
                debugMsg("msg--->$it")
            }
//打印
MainActivity;msg--->1
MainActivity;msg--->2
MainActivity;msg--->100

合并操作符 按顺序连接多个Observables

concat 按顺序连接多个Observables

startWith 在被观察发送元素之前追加数据或者追加新的被观察者

merge 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接

zip 将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数

concat合并操作符 按顺序连接多个Observables
image.png
Observable<String> names = Observable.just("1", "2");
Observable<String> otherNames = Observable.just("3", "4","5");
names.concat(otherNames).subscribe(item -> Log.d(TAG,item));

//打印:
RxJava: 1
RxJava: 2
RxJava: 3
RxJava: 4
RxJava: 5
startWith合并操作符 在被观察发送元素之前追加数据或者追加新的被观察者
image.png
Observable<String> names = Observable.just("Spock", "McCoy");
Observable<String> otherNames = Observable.just("Git", "Code","8");
names.startWith(otherNames).subscribe(item -> Log.d(TAG,item));

//打印:
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Spock
RxJava: McCo
merge合并操作符 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接
image.png
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");

Observable.merge(names,otherNames).subscribe(name -> Log.d(TAG,name));

//也可以是
//names.mergeWith(otherNames).subscribe(name -> Log.d(TAG,name));

//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8

merge在合并数据源时,如果一个合并发生异常后会立即调用观察者的onError方法,并停止合并。可通过mergeDelayError操作符,将发生的异常留到最后处理。

Observable<String> names = Observable.just("Hello", "world"); 
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable<String> error = Observable.error(    
                            new NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
    name -> Log.d(TAG,name), e->Log.d(TAG,e.getMessage()));
    
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Error!
zip合并操作符 将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数
image.png
 val observable1: Observable<Int?>? = Observable.create(ObservableOnSubscribe<Int?> { emitter ->
                emitter.onNext(1)
                emitter.onNext(2)
                emitter.onNext(3)
                emitter.onNext(4)
                emitter.onComplete()
            }).subscribeOn(Schedulers.io())

        val observable2: Observable<String?>? = Observable.create(ObservableOnSubscribe<String?> { emitter ->

                emitter.onNext("A")
                emitter.onNext("B")
                emitter.onNext("C")
                emitter.onComplete()
            }).subscribeOn(Schedulers.io())

        Observable.zip(
            observable1,
            observable2,
            object : BiFunction<Int?, String?, String?> {
                override fun apply(t1: Int, t2: String): String {
                   return t1.toString() + t2
                }

            }).subscribe(object : Consumer<String?> {

            override fun accept(value: String?) {
                debugMsg("msg--->$value")
            }
        })
//打印
.MainActivity$rxjavaZip$2;msg--->1A
.MainActivity$rxjavaZip$2;msg--->2B
.MainActivity$rxjavaZip$2;msg--->3C

如果想要合并更多的T,我们可以看如下Function函数,最多包含九个T

image.png

相关文章

  • RxJava点点滴滴

    响应式编程是一种关注数据流和变化传递的异步编程方式 什么是数据流? 数据流被称为流处理传统上的程序指的是按照特定的...

  • 无标题文章

    [TOC] 什么是rxjava 什么是rxjava 什么是rxjava 什么是rxjava 什么是rxjava 什...

  • RxJava2

    一、RxJava GitHub: RxJava2Demo 二、RxJava的概念 RxJava RxAndroid...

  • rxjava2+retorfit.md

    [TOC] ## 什么是rxjava ## 什么是rxjava ## 什么是rxjava ## 什么是rxjava...

  • RxJava学习笔记

    RxJava Rxjava的GitHub官网上是这样介绍rxjava的:RxJava is a Java VM i...

  • Retrofit 与 RxJava 结合使用出现的异常处理

    RxJava1 与 RxJava2 RxJava 有 RxJava1 和 RxJava2,两者不能共存。 如果AP...

  • RxJava2 的原理浅析

    RxJava RxJava 的依赖 reactive-streams 是rxjava2的唯一依赖,rxjava用到...

  • Android开发(48) rxjava 入门篇

    什么是 rxJava? 特性 rxJava 是解决 异步问题的。 rxJava 是基于事件机制的。 rxJava ...

  • Rxjava2.0

    Rxjava demogithub 搜索 : RxJava2-Android-Samples Rxjava 教...

  • RxJava 2.x 原理剖析

    一、RxJava 简介: 1.RxJava 定义: 2.RxJava 架构: 二、RxJava 2.x 环境搭建(...

网友评论

      本文标题:RxJava点点滴滴

      本文链接:https://www.haomeiwen.com/subject/atogactx.html