rxjava

作者: 奈何心善 | 来源:发表于2016-12-05 17:04 被阅读190次

    http://blog.csdn.net/yyh352091626/article/details/53304728

    目录

    前言

    RxJava 概念初步

    作用 - 异步

    模式 - 观察者模式

    结构 - 响应式编程

    优势 - 逻辑简洁

    RxJava 依赖

    RxJava 入门

    事件产生

    事件消费

    事件订阅

    区分回调动作

    入门示例

    RxJava 进阶

    Scheduler线程控制

    变换

    map操作符

    flatMap操作符

    RxJava 其他常用操作符

    RxJava 应用

    RxJavaRetrofit 的网络请求方式

    RxBus

    RxBinding

    RxJava 的一些坑

    未取消订阅而引起的内存泄漏

    总结

    前言

    转载请注明出处:http://blog.csdn.net/yyh352091626/article/details/53304728

    使用了RxJava有一段时间了,深深感受到了其“牛逼”之处。下面,就从RxJava的基础开始,一步一步与大家分享一下这个强大的异步库的用法!

    PS: 建议先看看上面的目录结构,便于理解,哈哈!

    RxJava 概念初步

    RxJava 在Github Repo上给的解释是:

    “RxJavaisa Java VMimplementationofReactive Extensions: alibraryforcomposing asynchronousandevent-based programsbyusingobservable sequences.”

    1

    2

    3

    1

    2

    3

    大概就是说RxJava是JavaVM上一个灵活的、使用可观测序列来组成的一个异步的、基于事件的库。咋一看好像不知道是啥东西… … 没事,往下看~

    作用 - 异步

    上面这段解释,重点就在于异步!但是它又不像AsyncTask这样用法简单,所以刚接触RxJava的童鞋,可能会觉得特别难,无从下手,没事,相信通过这篇文章,大伙儿可以有一个比较深刻的理解!

    RxJava精华可以浓缩为异步两个字,其核心的东西不外乎两个:

    1.  Observable(被观察者)

    2.  Observer/Subscriber(观察者)

    Observables可以发出一系列的事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer/Subscriber 的回调处理。

    模式 - 观察者模式

    观察者模式是一种对象的行为模式,是 Java 设计模式中很常用的一个模式。观察者模式也常称为:

    发布-订阅模式(Publish/Subscribe)

    模型-视图模式(Model/View)

    源-监听器模式(Source/Listener)

    从属者模式(Dependents)

    例如用过事件总线 EventBus 库的童鞋就知道,EventBus 属于发布-订阅模式(Publish/Subscribe)。

    // 事件订阅@Subscribe(threadMode = ThreadMode.MAIN)publicvoidshowDownProgress(MyEvent event) {// TODO}// 事件发布EventBus.getDefault().post(newMyEvent());

    1

    2

    3

    4

    5

    6

    7

    8

    1

    2

    3

    4

    5

    6

    7

    8

    实际上,使用 RxJava 也可以设计出一套事件总线的库,这个称为RxBus。有兴趣的话可以在学完 RxJava 之后,可以尝试写一个。这里就不细说了~

    为啥说这个呢?因为,RxJava 也是一种扩展的观察者模式!

    举个栗子,Android中 View 的点击监听器的实现,View 是被观察者,OnClickListener 对象是观察者,Activity 要如何知道 View 被点击了?那就是构造一个 OnClickListener 对象,通过 setOnClickListener 与View达成一个订阅关系,一旦 View 被点击了,就通过OnClickListener对象的 OnClick 方法传达给 Activity 。采用观察者模式可以避免去轮询检查,节约有限的cpu资源。

    结构 - 响应式编程

    响应式?顾名思义,就是“你变化,我响应”。举个栗子,a = b + c;这句代码将b+c的值赋给a,而之后如果b和c的值改变了不会影响到a,然而,对于响应式编程,之后b和c的值的改变也动态影响着a,意味着a会随着b和c的变化而变化。

    响应式编程的组成为Observable/Operator/Subscriber,RxJava在响应式编程中的基本流程如下:

    Observable->Operator1->Operator2->Operator3->Subscriber

    1

    1

    这个流程,可以简单的理解为:

    Observable发出一系列事件,他是事件的产生者;

    Subscriber负责处理事件,他是事件的消费者;

    Operator是对 Observable 发出的事件进行修改和变换;

    若事件从产生到消费不需要其他处理,则可以省略掉中间的 Operator,从而流程变为Obsevable -> Subscriber;

    Subscriber 通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的事务处理则交给 Operator;

    优势 - 逻辑简洁

    Rx 优势可以概括为四个字,那就是逻辑简洁。然而,逻辑简洁并不意味着代码简洁,但是,由于链式结构,一条龙,你可以从头到尾,从上到下,很清楚的看到这个连式结构的执行顺序。对于开发人员来说,代码质量并不在于代码量,而在于逻辑是否清晰简洁,可维护性如何,代码是否健壮!

    另外,熟悉lambda的,还可以进一步提高代码的简洁性。举个简单栗子对比一下,暂时不需要过多理解,后面会一一道来:

    // 不使用lambdaObservable.just("Hello World!")    .map(newFunc1() {@OverridepublicStringcall(String s) {returns +"I am kyrie!";        }    })    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    .subscribe(newAction1() {@Overridepublicvoidcall(String s) {            Log.i(TAG, s);        }    });// 使用lambdaObservable.just("Hello World!")    .map(s -> s +"I am kyrie!")    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    .subscribe(s -> {        Log.i(TAG, s);    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    RxJava 依赖

    在 Android Studio 项目下,为 module 增加 Gradle 依赖。

    // Android 平台下须引入的一个依赖,主要用于线程控制compile'io.reactivex:rxandroid:1.1.0'// RxJavacompile'io.reactivex:rxjava:1.1.5'

    1

    2

    3

    4

    1

    2

    3

    4

    这是我项目里面用的版本,也可以到Maven/RxJava下获取最新版本。

    RxJava 入门

    前面讲了那么多,大家在概念上对RxJava有一个初步的认识就好,接下来,将为您解开RxJava神秘的面纱~~

    无需过分纠结于“事件”这个词,暂时可以简单的把“事件”看成是一个值,或者一个对象。

    事件产生,就是构造要传递的对象;

    事件处理变换,就是改变传递的对象,可以改变对象的值,或是干脆创建个新对象,新对象类型也可以与源对象不一样;

    事件处理,就是接收到对象后要做的事;

    事件产生

    RxJava创建一个事件比较简单,由 Observable 通过 create 操作符来创建。举个栗子,还是经典的 HelloWorld~~

    // 创建一个ObservableObservable observable = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber) {// 发送一个 Hello World 事件subscriber.onNext("Hello World!");// 事件发送完成subscriber.onCompleted();    }});

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    这段代码可以理解为, Observable 发出了一个类型为 String ,值为 “Hello World!” 的事件,仅此而已。

    对于 Subscriber 来说,通常onNext()可以多次调用,最后调用onCompleted()表示事件发送完成。

    上面这段代码,也可以通过just操作符进行简化。RxJava常用操作符后面会详细介绍,这里先有个了解。

    // 创建对象,just里面的每一个参数,相当于调用一次Subscriber#OnNext()Observable observable = Observable.just("Hello World!");

    1

    2

    1

    2

    这样,是不是简单了许多?

    事件消费

    有事件产生,自然也要有事件消费。RxJava 可以通过 subscribe 操作符,对上述事件进行消费。首先,先创建一个观察者。

    // 创建一个ObserverObserver observer =newObserver() {@OverridepublicvoidonCompleted() {        Log.i(TAG,"complete");    }@OverridepublicvoidonError(Throwable e) {    }@OverridepublicvoidonNext(String s) {        Log.i(TAG, s);    }};

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    或者

    // 创建一个SubscriberSubscriber subscriber =newSubscriber() {@OverridepublicvoidonCompleted() {        Log.i(TAG,"complete");    }@OverridepublicvoidonError(Throwable e) {    }@OverridepublicvoidonNext(String s) {        Log.i(TAG, s);    }};

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    Observer 是观察者, Subscriber 也是观察者,Subscriber 是一个实现了Observer接口的抽象类,对 Observer 进行了部分扩展,在使用上基本没有区别;

    Subscriber 多了发送之前调用的onStart()和解除订阅关系的unsubscribe()方法。

    并且,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以在这之后的示例代码,都使用 Subscriber 来作为观察者。

    事件订阅

    最后,我们可以调用 subscribe 操作符, 进行事件订阅。

    // 订阅事件observable.subscribe(subscriber);

    1

    2

    1

    2

    在 Subscriber 实现的三个方法中,顾名思义,对应三种不同状态:

    1.onComplete(): 事件全部处理完成后回调

    2.onError(Throwable t): 事件处理异常回调

    3.onNext(T t): 每接收到一个事件,回调一次

    区分回调动作

    对于事件消费事件订阅来说,好像为了打印一个“Hello World!”要费好大的劲… 其实,RxJava 自身提供了精简回调方式,我们可以为 Subscriber 中的三种状态根据自身需要分别创建一个回调动作Action:

    // onComplete()Action0 onCompleteAction =newAction0() {@Overridepublicvoidcall() {        Log.i(TAG,"complete");    }};// onNext(T t)Action1 onNextAction =newAction1() {@Overridepublicvoidcall(String s) {        Log.i(TAG, s);    }};// onError(Throwable t)Action1 onErrorAction =newAction1() {@Overridepublicvoidcall(Throwable throwable) {    }};

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    那么,RxJava 的事件订阅支持以下三种不完整定义的回调。

    observable.subscribe(onNextAction);

    observable.subscribe(onNextAction, onErrorAction);

    observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

    1

    2

    3

    4

    5

    1

    2

    3

    4

    5

    我们可以根据当前需要,传入对应的 Action, RxJava 会相应的自动创建 Subscriber。

    Action0 表示一个无回调参数的Action;

    Action1 表示一个含有一个回调参数的Action;

    当然,还有Action2 ~ Action9,分别对应2~9个参数的Action;

    每个Action,都有一个 call() 方法,通过泛型T,来指定对应参数的类型;

    入门示例

    前面讲解了事件的产生到消费、订阅的过程,下面就举个完整的例子。从res/mipmap中取出一张图片,显示在ImageView上。

    finalImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber) {// 从mipmap取出一张图片作为Drawable对象Drawable drawable = ContextCompat.getDrawable(mContext, R.mipmap.ic_launcher);// 把Drawable对象发送出去subscriber.onNext(drawable);            subscriber.onCompleted();        }    })    .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {        }@OverridepublicvoidonError(Throwable e) {            Log.i(TAG, e.toString());        }@OverridepublicvoidonNext(Drawable drawable) {// 接收到Drawable对象,显示在ImageView上ivLogo.setImageDrawable(drawable);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    上面示例是RxJava最基本的一个用法。稍微消化一下,继续~~

    RxJava 进阶

    Scheduler线程控制

    默认情况下,RxJava事件产生和消费均在同一个线程中,例如在主线程中调用,那么事件的产生和消费都在主线程。

    那么问题来了,假如事件产生的过程是耗时操作,比如网络请求,结果显示在UI中,这个时候在主线程执行对于网络请求就不合适了,而在子线程执行,显示结果需要进行UI操作,同样不合适~~

    所以,RxJava 的第一个牛逼之处在于可以自由切换线程!那么,如何做?

    在 RxJava 中,提供了一个名为Scheduler的线程调度器,RxJava 内部提供了4个调度器,分别是:

    Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()效率比newThread()更高。值得注意的是,在io()下,不要进行大量的计算,以免产生不必要的线程;

    Schedulers.newThread(): 开启新线程操作;

    Schedulers.immediate(): 默认指定的线程,也就是当前线程;

    Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU;

    AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;

    我们可以通过subscribeOn()和observeOn()这两个方法来进行线程调度。举个栗子:

    依然还是显示一张图片,不同的是,这次是从网络上加载图片

    finalImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber) {try{            Drawable drawable = Drawable.createFromStream(newURL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(),"src");            subscriber.onNext(drawable);        }catch(IOException e) {            subscriber.onError(e);        }    }})// 指定 subscribe() 所在的线程,也就是上面call()方法调用的线程.subscribeOn(Schedulers.io())// 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程.observeOn(AndroidSchedulers.mainThread())        .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {            }@OverridepublicvoidonError(Throwable e) {                Log.e(TAG, e.toString());            }@OverridepublicvoidonNext(Drawable drawable) {                ivLogo.setImageDrawable(drawable);            }        });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    所以,这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。

    变换

    RxJava的又一牛逼之处,在于变换。啥意思呢? 就是将发送的事件或事件序列,加工后转换成不同的事件或事件序列。

    map操作符

    变换的概念不好理解吧?举个简单的栗子,我们对上述示例进行改写。

    finalImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber) {        subscriber.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");    }}).map(newFunc1() {@OverridepublicDrawablecall(String url) {try{            Drawable drawable = Drawable.createFromStream(newURL(url).openStream(),"src");returndrawable;        }catch(IOException e) {        }returnnull;    }})// 指定 subscribe() 所在的线程,也就是call()方法调用的线程.subscribeOn(Schedulers.io())// 指定 Subscriber 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程.observeOn(AndroidSchedulers.mainThread())        .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {            }@OverridepublicvoidonError(Throwable e) {                Log.e(TAG, e.toString());            }@OverridepublicvoidonNext(Drawable drawable) {if(drawable !=null) {                    ivLogo.setImageDrawable(drawable);                }            }        });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过map操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是:

    Observable-->map变换-->Observable

    1

    1

    那么,Func1是什么呢?与Action1类似,不同的是FuncX有返回值,而ActionX没有。为什么需要返回值呢?目的就在于对象的变换,由String对象转换为Drawable对象。同样,也有Func0 ~ Func9,对应不同的参数个数。

    当然了,RxJava 的变换,可不止于map这么简单,继续往下!

    flatMap操作符

    不难发现,上述的map操作符,是一对一的变换,并且返回的是变换后的对象。而flatMap操作符可以适应一对多,并且返回的是一个Observable。应用场景举例:例如一个员工负责多个任务,现在要打印所有员工的所有任务。

    finalList list =newArrayList() {    {        add(newEmployee("jackson", mission_list1));        add(newEmployee("sunny", mission_list2));    }};Observable.from(list)        .flatMap(newFunc1>() {@OverridepublicObservablecall(Employee employee) {returnObservable.from(employee.missions);            }        })        .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {            }@OverridepublicvoidonError(Throwable e) {            }@OverridepublicvoidonNext(Employee.Mission mission) {                Log.i(TAG, mission.desc);            }        });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    执行结果为顺序打印出两位员工的所有任务列表。

    通过上面的代码可以看出,map与flatMap这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

    flatMap返回的是一个Observable对象,而map返回的是一个普通转换后的对象;

    flatMap返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而map变换后返回的对象直接发到Subscriber回调中;

    flatMap变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;

    map返回类型 与flatMap返回的Observable事件类型,可以与原来的事件类型一样;

    可以对一个Observable多次使用map和flatMap;

    鉴于flatMap自身强大的功能,这常常被用于嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的onSuccess()回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:

    Github上的README.md文件,通常是 MarkDown 语法。我们要获取README.md内容并按 MarkDown 风格显示在UI上,就可以通过以下方式(Retrofit2 + RxJava,稍后会介绍):

    newReadmeContentClient()// 获取md语法的Readme内容, 返回的是一个Observable对象.getReadme()    .flatMap(newFunc1>() {@OverridepublicObservablecall(String md) {// 由于Readme的内容是md语法,需要转成html字符串通过WebView显示到UI// 返回的也是Observable对象returnnewMarkDownStyleClient(md)                            .formatMarkStyle();        }    })    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    .subscribe(newObserver() {@OverridepublicvoidonCompleted() {        }@OverridepublicvoidonError(Throwable e) {            Log.e(TAG,"readme:"+ e.toString());        }@OverridepublicvoidonNext(String html) {// html就是根据readme md格式内容,生成的html代码view.showReadme(html);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    RxJava 其他常用操作符

    from

    接收一个集合作为输入,然后每次输出一个元素给subscriber。

    // Observable.from(T[] params)Observable.from(newInteger[]{1,2,3,4,5})    .subscribe(newAction1() {@Overridepublicvoidcall(Integer number) {            Log.i(TAG,"number:"+ number);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    1

    2

    3

    4

    5

    6

    7

    8

    注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…);

    just

    接收一个可变参数作为输入,最终也是生成数组,调用from(),然后每次输出一个元素给subscriber。

    // Observable.just(T... params),params的个数为1 ~ 10Observable.just(1,2,3,4,5)    .subscribe(newAction1() {@Overridepublicvoidcall(Integer number) {            Log.i(TAG,"number:"+ number);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    1

    2

    3

    4

    5

    6

    7

    8

    filter

    条件过滤,去除不符合某些条件的事件。举个栗子:

    Observable.from(newInteger[]{1,2,3,4,5})    .filter(newFunc1() {@OverridepublicBooleancall(Integer number) {// 偶数返回true,则表示剔除奇数,留下偶数returnnumber %2==0;        }    })    .subscribe(newAction1() {@Overridepublicvoidcall(Integer number) {            Log.i(TAG,"number:"+ number);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    take

    最多保留的事件数。

    doOnNext

    在处理下一个事件之前要做的事。

    Observable.from(newInteger[]{1,2,3,4,5,6,7,8,9,10,11,12})    .filter(newFunc1() {@OverridepublicBooleancall(Integer number) {// 偶数返回true,则表示剔除奇数returnnumber %2==0;        }    })// 最多保留三个,也就是最后剩三个偶数.take(3)    .doOnNext(newAction1() {@Overridepublicvoidcall(Integer number) {// 在输出偶数之前输出它的hashCodeLog.i(TAG,"hahcode = "+ number.hashCode() +"");        }    })    .subscribe(newAction1() {@Overridepublicvoidcall(Integer number) {            Log.i(TAG,"number = "+ number);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    输出如下:

    hahcode =2number =2hahcode =4number =4hahcode =6number =6

    1

    2

    3

    4

    5

    6

    1

    2

    3

    4

    5

    6

    debounce

    通俗点讲,就是N个事件发生的时间间隔太近,就过滤掉前N-1个事件,保留最后一个事件。debounce可以指定这个时间间隔!可以用在SearchEditText请求关键词的地方,SearchEditText的内容变化太快,可以抵制频繁请求关键词,后面第15条15.Subject会介绍这个。为了演示效果,先举个简单栗子:

    Observable    .create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber) {inti =0;int[] times =newint[]{100,1000};while(true) {                i++;if(i >=100)break;                subscriber.onNext(i);try{// 注意!!!!// 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉// 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉Thread.sleep(times[i %2]);                }catch(InterruptedException e) {                    e.printStackTrace();                }            }            subscriber.onCompleted();        }    })// 间隔400ms以内的事件将被丢弃.debounce(400, TimeUnit.MILLISECONDS)    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {            Log.i(TAG,"complete");        }@OverridepublicvoidonError(Throwable e) {            Log.e(TAG, e.toString());        }@OverridepublicvoidonNext(Integer integer) {            Log.i(TAG,"integer = "+ integer);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    输出结果:

    11-2310:44:45.167MainActivity: integer =111-2310:44:46.270MainActivity: integer =311-2310:44:47.373MainActivity: integer =511-2310:44:48.470MainActivity: integer =711-2310:44:49.570MainActivity: integer =911-2310:44:50.671MainActivity: integer =1111-2310:44:51.772MainActivity: integer =1311-2310:44:52.872MainActivity: integer =1511-2310:44:53.973MainActivity: integer =17...

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    我们设置过滤条件为400ms,可以发现,奇数正常输出,因为在它的下一个事件事件隔了1000ms,所以它不会被过滤掉;偶数被过滤掉,是因为它距离下一个事件(奇数)只隔了100ms。并且,输出的两个事件相隔大约为100ms + 1000ms = 1100ms。

    merge

    用于合并两个Observable为一个Observable。较为简单。

    Observable.merge(Observable1, Observable2)

    .subscribe(subscriber);

    1

    2

    1

    2

    concat

    顺序执行多个Observable,个数为1 ~ 9。例子稍后与first操作符一起~~

    compose

    与flatMap类似,都是进行变换,返回Observable对象,激活并发送事件。

    compose是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用compose来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。

    compose是对 Observable 整体的变换,换句话说,flatMap转换Observable里的每一个事件,而compose转换的是整个Observable数据流。

    flatMap每发送一个事件都创建一个 Observable,所以效率较低。而compose操作符只在主干数据流上执行操作。

    建议使用compose代替flatMap。

    first

    只发送符合条件的第一个事件。可以与前面的contact操作符,做网络缓存。举个栗子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。

    // 从缓存获取Observable fromDisk = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber) {        BookList list = getFromDisk();if(list !=null) {            subscriber.onNext(list);        }else{            subscriber.onCompleted();        }    }});// 从网络获取Observable fromNetWork = bookApi.getBookDetailDisscussionList();Observable.concat(fromDisk, fromNetWork)// 如果缓存不为null,则不再进行网络请求。反之.first()        .subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {            }@OverridepublicvoidonError(Throwable e) {            }@OverridepublicvoidonNext(BookList discussionList) {            }        });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    网络缓存用法,具体可参见我的项目:https://github.com/JustWayward/BookReader

    timer

    可以做定时操作,换句话讲,就是延迟执行。事件间隔由timer控制。举个栗子:两秒后输出“Hello World!”

    Observable.timer(2, TimeUnit.SECONDS)    .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {        }@OverridepublicvoidonError(Throwable e) {        }@OverridepublicvoidonNext(Long aLong) {            Log.i(TAG,"Hello World!");        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    interval

    定时的周期性操作,与timer的区别就在于它可以重复操作。事件间隔由interval控制。举个栗子:每隔两秒输出“Hello World!”

    Observable.interval(2, TimeUnit.SECONDS)    .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {        }@OverridepublicvoidonError(Throwable e) {        }@OverridepublicvoidonNext(Long aLong) {            Log.i(TAG,"Hello World!");        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    throttleFirst

    与debounce类似,也是时间间隔太短,就丢弃事件。可以用于防抖操作,比如防止双击。

    RxView.clicks(button)  .throttleFirst(1, TimeUnit.SECONDS)  .subscribe(newObserver() {@OverridepublicvoidonCompleted() {      }@OverridepublicvoidonError(Throwable e) {      }@OverridepublicvoidonNext(Object o) {          Log.i(TAG,"do clicked!");      }  });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    上面这个RxView详见:https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定,JakeWharton大神的项目,厉害。

    Single

    Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。

    Single.create(newSingle.OnSubscribe() {@Overridepublicvoidcall(SingleSubscriber subscriber) {        subscriber.onSuccess("Hello");    }}).subscribe(newSingleSubscriber() {@OverridepublicvoidonSuccess(Object value) {        Log.i(TAG, value.toString());    }@OverridepublicvoidonError(Throwable error) {    }});

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    Subject

    Subject这个类,既是Observable又是Observer,啥意思呢?就是它自身既是事件的生产者,又是事件的消费者,相当于自身是一条管道,从一端进,又从另一端出。举个栗子:PublishSubject

    Subject subject = PublishSubject.create();// 1.由于Subject是Observable,所以进行订阅subject.subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {    }@OverridepublicvoidonError(Throwable e) {    }@OverridepublicvoidonNext(Object o) {        Log.i(TAG, o.toString());    }});// 2.由于Subject同时也是Observer,所以可以调用onNext发送数据subject.onNext("world");

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    这个好像有点厉害的样子,哈哈。可以配合debounce,避免SearchEditText频繁请求。

    Subject subject = PublishSubject.create();subject.debounce(400, TimeUnit.MILLISECONDS)        .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {        }@OverridepublicvoidonError(Throwable e) {        }@OverridepublicvoidonNext(Object o) {// request}    });edittext.addTextChangedListener(newTextWatcher() {@OverridepublicvoidbeforeTextChanged(CharSequence s,intstart,intcount,intafter) { }@OverridepublicvoidonTextChanged(CharSequence s,intstart,intbefore,intcount) {        subject.onNext(s.toString());    }@OverridepublicvoidafterTextChanged(Editable s) { } });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    RxJava 应用

    RxJava+Retrofit 的网络请求方式

    Retrofit是一个非常适合RestAPI的网络请求库。没用过的童鞋,还是推荐学一学的。

    使用Callback的请求方式:

    // 1. 定义一个请求接口@GET("/match/stat")Call getMatchStat(@Query("mid") String mid,@Query("tabType") String tabType);// 2. 创建Service对象Retrofit retrofit =newRetrofit.Builder()                        .baseUrl(BuildConfig.TENCENT_SERVER)                        .addConverterFactory(ScalarsConverterFactory.create())                        .client(OkHttpHelper.getTecentClient()).build();TencentApi api = retrofit.create(TencentApi.class);// 3. 调用Call call = api.getMatchStat(mid, tabType);call.enqueue(newCallback() {@OverridepublicvoidonResponse(Call call, Response response) {if(response !=null&& response.body()!=null)// 成功}else{// 无数据}    }@OverridepublicvoidonFailure(Call call, Throwable t) {// 失败}});

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    与 RxJava 结合的方式,则是

    // 1. 定义请求接口,返回的是Observable对象@GET("/user/followers")Observable> followers();// 2. 同样是创建api对象...// 3. 请求api.followers()    .subscribeOn(Schedulers.io())    .observeOn(AndroidSchedulers.mainThread())    .subscribe(newObserver>() {@OverridepublicvoidonCompleted() {        }@OverridepublicvoidonError(Throwable e) {// 请求出错。可能发生网络异常、Json解析异常等等}@OverridepublicvoidonNext(List list) {// 请求成功view.showMyFollowers(list);        }    });

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    若需嵌套请求,比如先获取Token再进行才能进行登录,可参考flatMap操作符最后的获取Readme内容显示在WebView上的例子。

    Retrofit2+RxJava+ Dagger2: 具体可参见我的项目,里面有比较详细的用法。

    https://github.com/JustWayward/BookReader

    不难发现,Retrofit 把请求封装进 Observable ,在请求结束后调用onNext()以及OnCompleted()或在请求失败后调用onError()。

    :RxJava形式的请求,并不能减少代码量,但是逻辑非常清晰。假如请求到数据之后需要对数据进行处理,并且是耗时操作,难道要再开一个线程,或者用AsyncTask再做一次异步?很显然,RxJava的变换很好的解决了这个问题,依然会使逻辑结构清晰。

    RxBus

    准确的来说,是一种基于RxJava实现事件总线的一种思想。可以替代EventBus/Otto,因为他们都依赖于观察者模式。可以参考https://github.com/AndroidKnife/RxBus这个库。

    RxBinding

    前面介绍过了,JakeWharton大神的项目,https://github.com/JakeWharton/RxBinding, 主要与RxJava结合用于一些View的事件绑定。

    RxJava 的一些坑

    未取消订阅而引起的内存泄漏

    举个栗子,对于前面常用操作符12.interval做周期性操作的例子,并没有使之停下来的,没有去控制订阅的生命周期,这样,就有可能引发内存泄漏。所以,在Activity#onDestroy()的时候或者不需要继续执行的时候应该取消订阅。

    Subscription subscription = Observable.interval(2, TimeUnit.SECONDS)    .subscribe(newSubscriber() {@OverridepublicvoidonCompleted() {        }@OverridepublicvoidonError(Throwable e) {        }@OverridepublicvoidonNext(Long aLong) {            Log.i(TAG,"Hello World!");        }    });// 调用unsubscribe();方法进行取消订阅subscription.unsubscribe();

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    但是,如果有很多个数据源,那岂不是要取消很多次?当然不是的,可以利用CompositeSubscription, 相当于一个Subscription集合。

    CompositeSubscription list =newCompositeSubscription();list.add(subscription1);list.add(subscription2);list.add(subscription3);// 统一调用一次unsubscribe,就可以把所有的订阅都取消list.unsubscribe();

    1

    2

    3

    4

    5

    6

    7

    1

    2

    3

    4

    5

    6

    7

    总结

    相信到了这里,大家对RxJava应该有了一个比较清晰的理解。当然,实践出真知,还是要去尝试,才能更深层次的体会到其强大之处。

    最后,总结一下RxJava的基本使用过程。

    首先是创建事件源源,也就是被观察者,可以用Observable的create/just/from等方法来创建;

    通过filter/debounce等操作符,进行自定义事件过滤;

    通过Schedules进行事件发送和订阅的线程控制,也就是subscribeOn()和observeOn();

    通过map/flatMap/compose等操作符,进行事件的变换;

    调用subscribe进行事件订阅;

    最后,不要忘了对订阅者生命周期的控制,不用的时候,记得调用unsubscribe(),以免引发内存泄漏。

    感谢阅读!

    相关文章

      网友评论

          本文标题:rxjava

          本文链接:https://www.haomeiwen.com/subject/qnxlmttx.html