美文网首页
RxJava使用

RxJava使用

作者: 媚竹风 | 来源:发表于2017-05-01 22:46 被阅读29次

    准备工作

    资料

    https://www.gitbook.com/book/mcxiaoke/rxdocs/details

    https://zhuanlan.zhihu.com/p/20687178

    http://gank.io/post/560e15be2dca930e00da1083

    RxJava 到底是什么?

    其实 RxJava 不是什么。

    实际上,Rx 是 Reactive Extensions(反应式扩展框架) 的缩写。

    RxJava 不过是其中支持Java 语言的 Reactive Extensions而已。

    也就是说 ,Rx 是一个反应式扩展框架。或者称为响应式扩展框架。

    Rx就是一个典型的函数式编程。

    RxJava 怎么火起来了

    RxJava 从13年发布,15年大家都在讨论,RxJava 逐渐的火起来了。

    RxJava能火起来其实主要就是异步。对于多线程的操作,我相信没有几个人你能真正掌握的,特别是线程的同步,绝对折磨人,而RXJava 能解决的就是这些问题,使得你只需关心业务而不去关心这些细节的东西。

    另一个是简洁,无论多么复杂的业务,Rx都能做到很简洁。

    举个例子:

    碰到过这样的场景,Android端需要展示的数据在二个不同的服务器上,我需要从二个服务器拿到数据后,合并展示。

    对于这样的需求,如果采用传统的方法,我相信没有几个同学能搞定的,线程间的相互等待,同步,没有几个人搞的清楚,那么有了RxJava 呢?

    看看下面的伪代码:

    
       Observable<String> work1 = Observable.create(new OnSubscribe<String>() {
    
                @Override
                public void call(Subscriber<? super String> t) {
                    String value1 = network1();
                    t.onNext(value1);
                    
                }
            })
            .subscribeOn(Schedulers.io());
            
    Observable<String> work2 = Observable.create(new OnSubscribe<String>() {
    
                @Override
                public void call(Subscriber<? super String> t) {
                
                    String value2 = network2();
                    t.onNext(value2);
                    
                }
            })
            .subscribeOn(Schedulers.io());
            
    Observable.zip(work1, work2, new Func2<String, String, String>() {
    
                @Override
                public String call(String t1, String t2) {
                    
                    return t1+t2;
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {
    
                @Override
                public void call(String t) {
                    
                    //更新数据到ui的相关代码
                    
                }
            });
    
    
    

    怎么样相当的简洁吧,几行代码解决了问题。我不在去考虑线程的等待同步问题了。

    Rx能解决什么问题,为什么要使用它

    从上面的描述和例子可以看出,Rx 胜在异步。有了它,你不必花费太多的心思在线程上。线程的同步,安全等这些问题都交给它吧,它会为你干好的。

    实际上,RxJava 在github上是这么介绍的:

    RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

    It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

    英语好的同学自己翻译,我就不翻译了。

    开始学习RxJava

    在学习RXJava 之前,首先要深刻的理解 观察者设计模式,如果连观察者设计模式都搞不清,那么还是停下来去看看观察者设计模式是什么样子的。

    Rx 的使用分为三个步骤:

    1. 创建 被观察者 Observer。
    2. 创建观察者 Observable
    3. 被观察者注册观察者。

    例子如下:

    Observable<String> observable = Observable.create(new OnSubscribe<String>() {
    
                @Override
                public void call(Subscriber<? super String> t) {
                    // TODO Auto-generated method stub
                    t.onNext("测试");
                    t.onCompleted();
                }
            });
            
    Observer<String> observer = new Observer<String>() {
                
                @Override
                public void onNext(String t) {
                    // TODO Auto-generated method stub
                    
                }
                
                @Override
                public void onError(Throwable e) {
                    // TODO Auto-generated method stub
                    
                }
                
                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub
                    
                }
            };
            
    observable.subscribe(observer);
    
    

    需要注意的是,在观察者模式中,是观察者去订阅 被观察者对象的,而RxJava中,是被观察的对象去订阅观察者,这一点非常的重要。

    RxJava 从简单的角度讲就是那些,其它的操作符都是在上面进行了扩展,本质上讲 就是那三个步骤。

    这三个步骤中,会涉及到三个对象:

    • Observer
    • Observable
    • 在创建Observer的时候 涉及到的一个对象OnSubscribe。

    对于OnSubscribe对象,其实相当于在观察者模式中被观察者 notifyDateChange方法的功能。

    操作符号

    RxJava 的操作符号比较多。全部记住这些操作符号,没有必要,更多的是在实际应用中去查找那个操作符适合目前的业务场景,用多了自然记住了,总的来说,这些操作符号分为下面几种。

    看懂图

    对于操作符号,真的记不住那么多,用的时候去拿,关键是要看懂图

    imageimage

    如下这张图:

    这张图 是 flatMap操作符的。我们需要明白下面这些东西

    imageimage

    每个Observer 通过 map规则转其它的Observer,其转换规则是,将 圆转成成一个菱形加上一个正方形。

    把上面的图看懂,基本上可以熟练的使用 RxJava了,请记住。无论是什么操作符都是针对Observer 的。

    一些常用操作符号

    创建操作符

    创建操作符,用来创建 Observer 对象的,常见的创建操作符有

    • Create.
    • Defer
    • From
    • just
    • Range
    • Repeat
    • Interval

    转换操作符号

    转换操作符的作用,是将一个ObServer 转换成另一个Observer。

    如 将Observer<String> 转成成另一个 Observer<Long> 可以是如下代码:

    
    Observable.just("1")
            .map(new Func1<String, Long>() {
    
                @Override
                public Long call(String t) {
                    // TODO Auto-generated method stub
                    return Long.valueOf(t);
                }
            })
            .subscribe(new Action1<Long>() {
    
                @Override
                public void call(Long t) {
                    // TODO Auto-generated method stub
                    
                }
            });
    
    

    转换符号中,用到的最多的是 map 和 flatMap。

    转换符有下面这些:

    • Buffer
    • FlatMap
    • Map
    • Scan
    • Window

    过滤符号

    过滤符号用于过滤那些条件不满足的Observer。

    • Take
    • Filter
    • First
    • ....

    组合操作符

    组合操作符用于将组合多个Observer 。

    • startWith() — 在数据序列的开头增加一项数据

    • merge() — 将多个Observable合并为一个

    • mergeDelayError() 合并多个Observables,让没有错误的Observable都完成后再发射错误通知

    • zip() — 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果

    • and(), then(), and when() — (rxjava-joins) 通过模式和计划组合多个Observables发射的数据集合

    • combineLatest() — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果

    • join() and groupJoin() — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射

    • switchOnNext() — 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据

    错误处理

    • Catch
    • retry

    其它相关的操作符号

    https://www.gitbook.com/book/mcxiaoke/rxdocs/details

    后记

    RxJava2 发布了,RxJava也要成为历史了。

    相关文章

      网友评论

          本文标题:RxJava使用

          本文链接:https://www.haomeiwen.com/subject/ohbqtxtx.html