RxJava 进击的Rx

作者: 花前月下的细说 | 来源:发表于2016-11-18 13:57 被阅读73次

Action###

先看看之前使用的代码如果使用Action来代替Subscriber得到的代码是这样的

Observable.just("Hi", "Man")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, s);
                    }
                });

我的天呐,舒服多了有没有!!


什么是Action

Action是RxJava 的一个接口,常用的有Action0和Action1。

Action0: 它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调

Ation1:它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj)和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调

Action的使用

定义三个对象,分别打包onNext(obj)、onError(error) 、onCompleted()

 Observable observable = Observable.just("Hello", "World");
      //处理onNext()中的内容
      Action1<String> onNextAction = new Action1<String>() {
          @Override
          public void call(String s) {
              Log.i(TAG, s);
          }
      };
      //处理onError()中的内容
      Action1<Throwable> onErrorAction = new Action1<Throwable>() {
          @Override
          public void call(Throwable throwable) {

          }
      };
      //处理onCompleted()中的内容
      Action0 onCompletedAction = new Action0() {
          @Override
          public void call() {
              Log.i(TAG, "Completed");

          }
      };

接下来使用subscribe重载的方法

//使用 onNextAction 来定义 onNext()
Observable.just("Hello", "World").subscribe(onNextAction);
//使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
Observable.just("Hello", "World").subscribe(onNextAction, onErrorAction);
//使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
Observable.just("Hello", "World").subscribe(onNextAction, onErrorAction, onCompletedAction);

重新写打印"Hi", "Man"的方法

Observable.just("Hi","Man").subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
        // TODO Auto-generated method stub
        Log.i(TAG, s);
         }
        });

Scheduler

前言

首先,RxJava得模式大概这样子:由Observable发起事件,经过中间的处理后由Observer消费。(对RxJava还不了解的再去看一遍)

在没使用Scheduler之前,事件的发起和消费都是在同一个线程中执行,也就是说不使用Scheduler的RxJava是同步的~~~显然不好吧

介绍

RxJava在不指定线程的情况下,发起时间和消费时间默认使用当前线程。所以之前的做法

Observable.just(student1, student2, student2)
//使用map进行转换,参数1:转换前的类型,参数2:转换后的类型
.map(new Func1<Student, String>() {
@Override
public String call(Student i) {
String name = i.getName();//获取Student对象中的name
return name;//返回name
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
nameList.add(s);
}
});

因为是在主线程中发起的,所以不管中间map的处理还是Action1的执行都是在主线程中进行的。若是map中有耗时的操作,这样会导致主线程拥塞,这并不是我们想看到的。

Scheduler

Scheduler:线程控制器,可以指定每一段代码在什么样的线程中执行。

    模拟一个需求:新的线程发起事件,在主线程中消费
private void rxJavaTest3() {
Observable.just("Hello", "Word")
.subscribeOn(Schedulers.newThread())//指定 subscribe() 发生在新的线程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
            });

上面用到了subscribeOn(),和observeOn()方法来指定发生的线程和消费的线程。

subscribeOn():指定subscribe() 所发生的线程,即Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

observeOn():指定Subscriber 所运行在的线程。或者叫做事件消费的线程。

常用Scheduler线程

Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread():总是启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

AndroidSchedulers.mainThread():它指定的操作将在 Android 主线程运行。

多次切换线程

看完上面的介绍想必对RxJava线程的切换有了一些理解,上面只是对事件的发起和消费制定了线程。

如果中间有map之类的操作呢?是否可以实现发起的线程在新线程中,map的处理在IO线程,最后的消费在主线程中。

Observable.just("Hello", "Wrold")
.subscribeOn(Schedulers.newThread())//指定:在新的线程中发起
.observeOn(Schedulers.io())         //指定:在io线程中处理
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return handleString(s);       //处理数据
}
})
.observeOn(AndroidSchedulers.mainThread())//指定:在主线程中处理
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
show(s);                       //消费事件
}
});

可以看到observeOn()被调用了两次,分别指定了map的处理的现场和消费事件show(s)的线程。

若将observeOn(AndroidSchedulers.mainThread())去掉会怎么样?不为消费事件show(s)指定线程后,show(s)会在那里执行?

其实,observeOn() 指定的是它之后的操作所在的线程。也就是说,map的处理和最后的消费事件show(s)都会在io线程中执行。
observeOn()可以多次使用,可以随意变换线程

Subscription

前言

RxJava如何取消订阅?

就是它 Subscription

RxJava中有个叫做Subscription的接口,可以用来取消订阅.

publicinterfaceSubscription{
/**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */voidunsubscribe();

/**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */booleanisUnsubscribed();
}

从上面可以看到,只需要调用unsubscribe就可以取消订阅,那么如何得到一个Subscription对象呢?

其实很简单:
Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回.

感觉Subscription就像一个订单,你下单了就会生成一个订单,而你也可以用这个订单取消订单.

OK,内容其实不多,那么来练习一下吧.

实战

Subscription subscription = Observable.just("Hello subscription")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
        });
System.out.println(subscription.isUnsubscribed());
subscription.unsubscribe();
System.out.println(subscription.isUnsubscribed());

在我想来输出的日志应该是这样的:

    Hello subscription
    falsetrue

但是,结果出乎我的意料,我运行之后是这样的:

    Hello subscription
    truetrue
这是什么鬼啊

相关文章

网友评论

    本文标题:RxJava 进击的Rx

    本文链接:https://www.haomeiwen.com/subject/cuajpttx.html