Action###
先看看之前使用的代码如果使用Action来代替Subscriber得到的代码是这样的
Observable.just("Hi", "Man")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
我的天呐,舒服多了有没有!!
什么是Action
Action是RxJava 的一个接口,常用的有Action0和Action1。
Action0: 它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调
Ation1:它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj)和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调
Action的使用
定义三个对象,分别打包onNext(obj)、onError(error) 、onCompleted()
Observable observable = Observable.just("Hello", "World");
//处理onNext()中的内容
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
};
//处理onError()中的内容
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
};
//处理onCompleted()中的内容
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
Log.i(TAG, "Completed");
}
};
接下来使用subscribe重载的方法
//使用 onNextAction 来定义 onNext()
Observable.just("Hello", "World").subscribe(onNextAction);
//使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
Observable.just("Hello", "World").subscribe(onNextAction, onErrorAction);
//使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
Observable.just("Hello", "World").subscribe(onNextAction, onErrorAction, onCompletedAction);
重新写打印"Hi", "Man"的方法
Observable.just("Hi","Man").subscribe(new Action1<String>() {
@Override
public void call(String s) {
// TODO Auto-generated method stub
Log.i(TAG, s);
}
});
Scheduler
前言
首先,RxJava得模式大概这样子:由Observable发起事件,经过中间的处理后由Observer消费。(对RxJava还不了解的再去看一遍)
在没使用Scheduler之前,事件的发起和消费都是在同一个线程中执行,也就是说不使用Scheduler的RxJava是同步的~~~显然不好吧
介绍
RxJava在不指定线程的情况下,发起时间和消费时间默认使用当前线程。所以之前的做法
Observable.just(student1, student2, student2)
//使用map进行转换,参数1:转换前的类型,参数2:转换后的类型
.map(new Func1<Student, String>() {
@Override
public String call(Student i) {
String name = i.getName();//获取Student对象中的name
return name;//返回name
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
nameList.add(s);
}
});
因为是在主线程中发起的,所以不管中间map的处理还是Action1的执行都是在主线程中进行的。若是map中有耗时的操作,这样会导致主线程拥塞,这并不是我们想看到的。
Scheduler
Scheduler:线程控制器,可以指定每一段代码在什么样的线程中执行。
模拟一个需求:新的线程发起事件,在主线程中消费
private void rxJavaTest3() {
Observable.just("Hello", "Word")
.subscribeOn(Schedulers.newThread())//指定 subscribe() 发生在新的线程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});
上面用到了subscribeOn(),和observeOn()方法来指定发生的线程和消费的线程。
subscribeOn():指定subscribe() 所发生的线程,即Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn():指定Subscriber 所运行在的线程。或者叫做事件消费的线程。
常用Scheduler线程
Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread():总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
AndroidSchedulers.mainThread():它指定的操作将在 Android 主线程运行。
多次切换线程
看完上面的介绍想必对RxJava线程的切换有了一些理解,上面只是对事件的发起和消费制定了线程。
如果中间有map之类的操作呢?是否可以实现发起的线程在新线程中,map的处理在IO线程,最后的消费在主线程中。
Observable.just("Hello", "Wrold")
.subscribeOn(Schedulers.newThread())//指定:在新的线程中发起
.observeOn(Schedulers.io()) //指定:在io线程中处理
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return handleString(s); //处理数据
}
})
.observeOn(AndroidSchedulers.mainThread())//指定:在主线程中处理
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
show(s); //消费事件
}
});
可以看到observeOn()被调用了两次,分别指定了map的处理的现场和消费事件show(s)的线程。
若将observeOn(AndroidSchedulers.mainThread())去掉会怎么样?不为消费事件show(s)指定线程后,show(s)会在那里执行?
其实,observeOn() 指定的是它之后的操作所在的线程。也就是说,map的处理和最后的消费事件show(s)都会在io线程中执行。
observeOn()可以多次使用,可以随意变换线程
Subscription
前言
RxJava如何取消订阅?
就是它 Subscription
RxJava中有个叫做Subscription的接口,可以用来取消订阅.
publicinterfaceSubscription{
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/voidunsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/booleanisUnsubscribed();
}
从上面可以看到,只需要调用unsubscribe就可以取消订阅,那么如何得到一个Subscription对象呢?
其实很简单:
Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回.
感觉Subscription就像一个订单,你下单了就会生成一个订单,而你也可以用这个订单取消订单.
OK,内容其实不多,那么来练习一下吧.
实战
Subscription subscription = Observable.just("Hello subscription")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
System.out.println(subscription.isUnsubscribed());
subscription.unsubscribe();
System.out.println(subscription.isUnsubscribed());
在我想来输出的日志应该是这样的:
Hello subscription
falsetrue
但是,结果出乎我的意料,我运行之后是这样的:
Hello subscription
truetrue
这是什么鬼啊
网友评论