美文网首页
RxJava点点滴滴

RxJava点点滴滴

作者: kevinsEegets | 来源:发表于2020-01-13 15:17 被阅读0次

    响应式编程是一种关注数据流和变化传递的异步编程方式

    什么是数据流?

    数据流被称为流处理
    传统上的程序指的是按照特定的顺序执行的一系列操作,通常称为控制流
    数据流编程强调数据的传递,并将程序构建为一系列的连接。数据库,用户输入,数组,网络请求都属于数据流

    什么是变化传播?

    一种数据流输入,经过一系列的操作之后转换为另外一个数据流,然后再分发给各个订阅者

    什么是异步编程?

    传统的编程方式是顺序执行的,必须在完成了上一个任务之后才能执行下一个任务。
    比如我们有五个接口,个别接口有相互依赖关系,我们以前的做法就是顺序执行,一个一个执行完毕之后最 后才进行UI界面刷新,这样导致了程序依赖性大大增强并且程序可读性很差“,我们想让其响应速度升级应该 怎么做?我们可以把同步执行换成异步执行,这样的优点是,当你的任务不依赖彼此时,我们可以同时启动多 个任务,而不是等到执行完一个任务再执行下一个

    什么是RxJava?

    • Reactive Extensions
    • 观察者模式
    • 迭代器模式(聚合对象,不触碰对象本身,游标(Cursor)模式)
    • 函数式编程

    为什么要用RxJava?

    • 线程方便切换
    • 链式调用更易于代码阅读
    • 使用Lambda表达式简化代码量
    • 解耦单一,不嵌套
    • 强大的操作符

    我们看一个经典的例子

    在子线程中把某个文件夹里面的以 png 结尾的图片文件解析出来,交给 UI 线程进行渲染

    new Thread() {
        @Override
        public void run() {
            super.run();
            for (File folder : folders) {
                File[] files = folder.listFiles();
                for (File file : files) {
                    if (file.getName().endsWith(".png")) {
                        final Bitmap bitmap = getBitmapFromFile(file);
                        getActivity().runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                imageCollectorView.addImage(bitmap);
                            }
                        });
                    }
                }
            }
        }
    }.start();
    

    我们用RxJava修改一下上述代码,如下

    Observable.from(folders)
        .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
        .filter((Func1) (file) -> { file.getName().endsWith(".png") })
        .map((Func1) (file) -> { getBitmapFromFile(file) })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
    

    我们从最基本开始看看

    val observable = Observable.create(object :ObservableOnSubscribe<Int> {
        override fun subscribe(it: ObservableEmitter<Int>?) {
            it?.onNext(1)
            it?.onNext(2)
            it?.onComplete()
        }
    })
    val observer = object: Observer<Int> {
        override fun onComplete() {
            debugMsg("onComplete")
        }
        override fun onSubscribe(d: Disposable?) {
            debugMsg("onSubscribe")
        }
        override fun onNext(value: Int?) {
            debugMsg("==",value)
        }
        override fun onError(e: Throwable?) {
            debugMsg("onError")
        }
    }
    observable.subscribe(observer)
    

    RxJava创建操作符

    create 从头创建一个Observable

    from 将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable

    just 将一个或多个对象转换成发射这个或这些对象的一个Observable

    timer 创建一个在给定的延时之后发射数据项为0的Observable<Long>

    empty 创建一个什么都不做直接通知完成的Observable

    error 创建一个什么都不做直接通知错误的Observable

    never 创建一个什么都不做的Observable

    interval 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>

    range 创建一个发射指定范围的整数序列的Observable<Integer>

    defer 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable

    create操作符
    Observable.create(ObservableOnSubscribe<Int?> { emitter ->
                emitter.onNext(1)
                emitter.onNext(2)
                emitter.onNext(3)
            }).map(Function<Int?, String?> { integer -> "This is new result $integer" })
                .subscribe { t -> debugMsg("msg--->$t") }
    
    empty, error, never操作符
    Observable observable1=Observable.empty();//直接调用onCompleted。
     
    Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常
    
    Observable observable3=Observable.never();//啥都不做
    
    timer操作符 实现一个延时操作,类似Handler postDelay
    Observable.timer(3000, TimeUnit.MILLISECONDS)
                .subscribe(object : Consumer<Long?> {
                    override fun accept(t: Long?) {
                       debugMsg("msg--->$t")
                    }
                })
    
    interval操作符 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
    Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(object : Consumer<Long?> {
                    override fun accept(t: Long?) {
                        debugMsg("msg--->$t")
                    }
                })
    //打印
    .MainActivity$rxjavaInterval$1;msg--->0
    .MainActivity$rxjavaInterval$1;msg--->1
    .MainActivity$rxjavaInterval$1;msg--->2
    .MainActivity$rxjavaInterval$1;msg--->3
    .MainActivity$rxjavaInterval$1;msg--->4
    .MainActivity$rxjavaInterval$1;msg--->5
    

    过滤操作符

    skip 跳过操作

    filter 过滤数据

    ofType 过滤指定类型的数据

    take 只发射开始的N项数据或者一定时间内的数据

    takeFirst 提取满足条件的第一项

    takeLast 只发射最后的N项数据或者一定时间内的数据

    throttleFirst 定期发射Observable发射的第一项数据

    debounce 去抖动

    distinct 过滤重复数据

    distinctUntilChanged 过滤掉连续重复的数据

    timeout 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable

    debounce 去抖动过滤操作符 可作用于Flowable,Observable。在Android开发,通常为了防止用户重复点击而设置标记位,而通过RxJava的debounce操作符可以有效达到该效果。在规定时间内,用户重复点击只有最后一次有效
    Observable<String> source = Observable.create(emitter -> {
        emitter.onNext("A");
    
        Thread.sleep(1_500);
        emitter.onNext("B");
    
        Thread.sleep(500);
        emitter.onNext("C");
    
        Thread.sleep(250);
        emitter.onNext("D");
    
        Thread.sleep(2_000);
        emitter.onNext("E");
        emitter.onComplete();
    });
    
    source.subscribeOn(Schedulers.io())
            .debounce(1, TimeUnit.SECONDS)
            .blockingSubscribe(
                    item -> System.out.print(item+" "),
                    Throwable::printStackTrace,
                    () -> System.out.println("onComplete"));
    
    打印:A D E onComplete
    

    上文代码中,数据源以一定的时间间隔发送A,B,C,D,E。操作符debounce的时间设为1秒,发送A后1.5秒并没有发射其他数据,所以A能成功发射。发射B后,在1秒之内,又发射了C和D,在D之后的2秒才发射E,所有B、C都失效,只有D有效;而E之后已经没有其他数据流了,所有E有效。

    blockingSubscribe可以阻塞当前线程,等待执行完成(https://stackoom.com/question/3f7BX/Rxjava-blockingSubscribe-vs-subscribe

    take过滤操作符
       Observable.just(2, 3, 4, "5", 6)
                .take(4)
                .subscribe{
                    debugMsg("msg--->$it")
                }     
    //打印                                                      
    MainActivity;msg--->2
    MainActivity;msg--->3
    MainActivity;msg--->4
    MainActivity;msg--->5                                                     
    
    distinct过滤操作符
    Observable.just(2, 3, 4, 5, 4, 3, 6)
                .distinct ()
                .subscribe {
                    debugMsg("msg--->$it")
                }                                                     
    //打印
    MainActivity;msg--->2
    MainActivity;msg--->3
    MainActivity;msg--->4
    MainActivity;msg--->5
    MainActivity;msg--->6                                           
    

    变换操作符

    map 上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化

    cast 在发射之前强制将Observable发射的所有数据转换为指定类型

    flatMap 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并

    flatMapIterable 和flatMap的作用一样,只不过生成的是Iterable而不是Observable

    concatMap 类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射

    scan 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。

    groupBy 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。

    buffer 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个

    window 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。

    map变换操作符 上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化

    image.png
            Observable.create(ObservableOnSubscribe<Int?> { emitter ->
                emitter.onNext(1)
                emitter.onNext(2)
                emitter.onNext(3)
            }).map(Function<Int?, String?> { integer -> "This is new result $integer" })
                .subscribe { t -> debugMsg("msg--->$t") }
    //打印
    MainActivity;msg--->This is new result 1
    MainActivity;msg--->This is new result 2
    MainActivity;msg--->This is new result 3 
    

    flatMap变换操作符 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并

    image.png
      Observable.create(ObservableOnSubscribe<Int?> { emitter ->
                emitter.onNext(1)
                emitter.onNext(2)
                emitter.onNext(3)
            }).flatMap(object : Function<Int?, ObservableSource<String?>> {
                override fun apply(t: Int): ObservableSource<String?> {
                    val list: MutableList<String> = ArrayList()
                    for (i in 0..2) {
                        list.add("I am flatMap value $t")
                    }
                    return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS)
                }
    
            }).subscribe { t -> debugMsg("msg--->$t") }
    //打印
    MainActivity;msg--->I am flatMap value 2
    MainActivity;msg--->I am flatMap value 2
    MainActivity;msg--->I am flatMap value 3
    MainActivity;msg--->I am flatMap value 3
    MainActivity;msg--->I am flatMap value 1
    MainActivity;msg--->I am flatMap value 1
    

    错误 / 重复操作符

    onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable

    onExceptionResumeNext 当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常

    onErrorReturn 当原始Observable在遇到错误时发射一个特定的数据

    onErrorReturnItem 当原始Observable在遇到错误时发射一个特定的数据

    retry 当原始Observable在遇到错误时进行重试。

    retryWhen 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable

    onErrorResumeNext操作符 当原始Observable在遇到错误时,使用备用Observable
    image.png
    val list = arrayOf(1, 2, 3, 4, 5)
            Observable.create(ObservableOnSubscribe<Int> { emitter ->
                list.forEach {
                    if (it == 3) emitter.onError(Throwable())
                    else emitter.onNext(it)
                }
            }).onErrorResumeNext(Function<Throwable, ObservableSource<Int?>> { throwable ->
                    //拦截到错误之后,重新定义了被观察者
                    Observable.just(100)
                })
                .subscribe{
                    debugMsg("msg--->$it")
                }
    //打印
    MainActivity;msg--->1
    MainActivity;msg--->2
    MainActivity;msg--->100
    
    onErrorReturnItem操作符 当原始Observable在遇到错误时发射一个特定的数据
    val list = arrayOf(1, 2, 3, 4, 5)
            Observable.create(ObservableOnSubscribe<Int> { emitter ->
                list.forEach {
                    if (it == 3) emitter.onError(Throwable())
                    else emitter.onNext(it)
                }
            }).onErrorReturnItem(100)
                .subscribe{
                    debugMsg("msg--->$it")
                }
    //打印
    MainActivity;msg--->1
    MainActivity;msg--->2
    MainActivity;msg--->100
    

    合并操作符 按顺序连接多个Observables

    concat 按顺序连接多个Observables

    startWith 在被观察发送元素之前追加数据或者追加新的被观察者

    merge 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接

    zip 将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数

    concat合并操作符 按顺序连接多个Observables
    image.png
    Observable<String> names = Observable.just("1", "2");
    Observable<String> otherNames = Observable.just("3", "4","5");
    names.concat(otherNames).subscribe(item -> Log.d(TAG,item));
    
    //打印:
    RxJava: 1
    RxJava: 2
    RxJava: 3
    RxJava: 4
    RxJava: 5
    
    startWith合并操作符 在被观察发送元素之前追加数据或者追加新的被观察者
    image.png
    Observable<String> names = Observable.just("Spock", "McCoy");
    Observable<String> otherNames = Observable.just("Git", "Code","8");
    names.startWith(otherNames).subscribe(item -> Log.d(TAG,item));
    
    //打印:
    RxJava: Git
    RxJava: Code
    RxJava: 8
    RxJava: Spock
    RxJava: McCo
    
    merge合并操作符 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接
    image.png
    Observable<String> names = Observable.just("Hello", "world");
    Observable<String> otherNames = Observable.just("Git", "Code","8");
    
    Observable.merge(names,otherNames).subscribe(name -> Log.d(TAG,name));
    
    //也可以是
    //names.mergeWith(otherNames).subscribe(name -> Log.d(TAG,name));
    
    //打印:
    RxJava: Hello
    RxJava: world
    RxJava: Git
    RxJava: Code
    RxJava: 8
    

    merge在合并数据源时,如果一个合并发生异常后会立即调用观察者的onError方法,并停止合并。可通过mergeDelayError操作符,将发生的异常留到最后处理。

    Observable<String> names = Observable.just("Hello", "world"); 
    Observable<String> otherNames = Observable.just("Git", "Code","8");
    Observable<String> error = Observable.error(    
                                new NullPointerException("Error!"));
    Observable.mergeDelayError(names,error,otherNames).subscribe(
        name -> Log.d(TAG,name), e->Log.d(TAG,e.getMessage()));
        
    //打印:
    RxJava: Hello
    RxJava: world
    RxJava: Git
    RxJava: Code
    RxJava: 8
    RxJava: Error!
    
    zip合并操作符 将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数
    image.png
     val observable1: Observable<Int?>? = Observable.create(ObservableOnSubscribe<Int?> { emitter ->
                    emitter.onNext(1)
                    emitter.onNext(2)
                    emitter.onNext(3)
                    emitter.onNext(4)
                    emitter.onComplete()
                }).subscribeOn(Schedulers.io())
    
            val observable2: Observable<String?>? = Observable.create(ObservableOnSubscribe<String?> { emitter ->
    
                    emitter.onNext("A")
                    emitter.onNext("B")
                    emitter.onNext("C")
                    emitter.onComplete()
                }).subscribeOn(Schedulers.io())
    
            Observable.zip(
                observable1,
                observable2,
                object : BiFunction<Int?, String?, String?> {
                    override fun apply(t1: Int, t2: String): String {
                       return t1.toString() + t2
                    }
    
                }).subscribe(object : Consumer<String?> {
    
                override fun accept(value: String?) {
                    debugMsg("msg--->$value")
                }
            })
    //打印
    .MainActivity$rxjavaZip$2;msg--->1A
    .MainActivity$rxjavaZip$2;msg--->2B
    .MainActivity$rxjavaZip$2;msg--->3C
    

    如果想要合并更多的T,我们可以看如下Function函数,最多包含九个T

    image.png

    相关文章

      网友评论

          本文标题:RxJava点点滴滴

          本文链接:https://www.haomeiwen.com/subject/atogactx.html