本文开始介绍RxJava的使用,和别的RxJava文章不同的地方在于我们直接从实战开始介绍,不讲那么多的花拳绣腿。好多RxJava启蒙文章都是你抄我我抄你,抄来抄去也就那么几个场景,换到自己的项目中还是不会使用RxJava,本文另辟蹊径实战讲解。
既然是另辟蹊径,那么肯定开场不能跟别人一样。什么Observable,什么Subscribe,什么Observe,什么Subscribe...这几个长得差不多的都是什么鬼?我们暂时统统不管,且请还没入门的小白同学们暂且忘记这些。我来告诉你们什么才是RxJava的启蒙。
简单入门
我们要知道的第一个概念叫做"Single",中文翻译叫一个或者单一的,也可以作为单身的意思,想必你就是单身吧....
第二个概念叫做"just",在RxJava中Single的just支持传入任意参数。使用方法如:
Single.just(T value);
知道这段代码,就可以说RxJava你要入门了噢。你想想看我们为啥要用RxJava,主要是为了异步处理一些任务。通常情况的异步任务都是什么样子的呢?传入一些值,经过逻辑处理之后返回结果。
以上代码中的just(T value),可以看作是传入参数。
有了输入,如何输出呢?我们讲到了异步处理,那自然得有一个异步回调才行吧,这简直理所应当吧。所以这里介绍最重要的"subscribe"概念。有的同学会疑惑说“你不是说不讲subscribe的嘛”。嗯咳,不讲不行嘛,谁叫人家是最重要的“订阅概念”呢。我只是让你们学习之前先忘记那些让人混淆的概念,从新开始嘛。废话少说,看代码
Single.just(T value).subscribe();
例如:
int addValue(int a, int b) {
return a + b;
}
// ps:不管是否subscribe(),只要使用just(),addValue()都会执行
Single.just(addValue(1, 2)).subscribe();
异步任务知道如何执行了,那咱们该想办法拿到回调了!
Single.just(addValue(1, 2)).subscribe(new SingleSubscriber<Integer>() {
@Override
public void onSuccess(Integer value) {
// value = 3
}
@Override
public void onError(Throwable error) {}
});
这里我们使用一个叫SingleSubscriber的对象接收回调,作为入门只要记住就好了。其中onSuccess(T value)会接收just(T value)的传入值,当addValue()方法抛异常的时候会自动调用onError()。
通过以上的例子,我们学会了如何简单的传入参数,并且输出结果。但是这里我并不想误导大家,这里仅仅只是RxJava的开始,其中just()方法无论如何都只会在当前线程里执行。所以即使看上去有异步的过程,但其实这是个同步的过程!
Single高阶
接下来我们会很深入的讲解Single的用法了。主要介绍Single的操作符,用一星~三星表示重要程度,三星表示很常用,一星表示了解就行。按首字母顺序介绍。
compose操作符[*]
创建一个自定义的操作符,将某种范型的Single转换为另一种范型一般用不到,示例如下,将Integer的Single转为String Single。
Single.just(addValue(1, 2))
.compose(new Single.Transformer<Integer, String>() {
@Override
public Single<String> call(Single<Integer> integerSingle) {
return integerSingle.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer + 2);
}
});
}
})
.subscribe(new SingleSubscriber<String>() {
@Override
public void onSuccess(String value) {
// value = 5
}
@Override
public void onError(Throwable error) {}
});
这个例子就看起来很复杂了吧,不过不用担心,这个场景很少用到,这里只是简单介绍一下:之前的例子中我们创建了一个Integer的Single其中返回值是一加二的结果三,但是我们其实需要的结果并不是整型的三是字符串三又或是其他类型的对象,这个时候我们就需要结合map()操作符转换一下Single。map()操作符我们只会会讲,只要知道它可以用来转换类型就行。这里值得注意的是compose()方法可以指定线程运行,即可以指定Schedulers。如果不懂不要担心,以后还会介绍。
concat操作符[*]
用来连接多个Single和Observable发射的数据。
Single.concat(Single.just(checkNetwork()), Single.just(checkMemory()), Single.just(doSth()))
仅仅用来连接Single顺序执行的,比如顺序执行检查网络,检查内存,执行任务,注意:如果某个Single调用了onError()会导致被中断。
create操作符[***]
// 作用同Single.just(addValue(1, 2));
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
singleSubscriber.onSuccess(addValue(1, 2));
}
});
// 常见的示例,这是一个异步操作
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
// 这里被指定在IO线程
singleSubscriber.onSuccess(addValue(1, 2));
}
}).subscribeOn(Schedulers.io())// 指定运行在IO线程
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(Integer o) {
// o = 3
}
});
值得注意的是之前我们使用的just()是一种特殊的create(),它不能指定Schedulers。只能在当前线程中运行,而create()可以指定Schedulers实现异步处理。且just()不管是否被subscribe()订阅均会被调用,而create()如果不被订阅是不会被调用的。所以我们通常可以用just()传递简单参数,而用create()处理复杂异步逻辑。
error操作符[*]
返回一个立即给订阅者发射错误通知的Single,一般用于调试,不常用。
// 如人为让concat中断:
Single.concat(s1, Single.error(new Throwable("error"), s2)).subscribe();
flatMap操作符[*]
flatMap<T,R>基本上等同于map(),唯一的区别在于flatMap的R一般用于返回Observable对象,这样随后的subscribe参数可以使用原始类型。详见代码
Single.just(1).flatMap(new Func1<Integer, Single<String>>() {
@Override
public Single<String> call(Integer x) {
return Single.just(x + "");
}}).subscribe(new Action1<String>() {// 注意这里返回值的区别
@Override
public void call(String s) {
LogHelper.e("_flatMap:"+s);
}});
Single.just(1).map(new Func1<Integer, Single<String>>() {
@Override
public Single<String> call(Integer x) {
return Single.just(x + "");
}}).subscribe(new Action1<Single<String>>() {
@Override
public void call(Single<String> s) {// 注意这里返回值的区别
LogHelper.e("_flatMap:"+s);
}});
一般map()是用于一对一的返回,而flatMap()用于一对0~多的返回。比如我们看下面这个例子:
static Observable<List<String>> query() {
List<String> s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");
return Observable.just(s);
}
// 注意这里的参数是 query所返回的Observable的输出,并且返会一个Observable<String>
query().flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> strings) {
//结合from处理 return Observable.from(strings);
}}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("_flatMap:"+s);
}
});
输出:
_flatMap:Java
_flatMap:Android
_flatMap:Ruby
_flatMap:Ios
_flatMap:Swift
这里传入了一个List<String>,传出了多个String。而且应该多用于Observable,很少用在Single中,即使用也不如map()来的爽快,这里只做了解即可。
flatMapObservable操作符[**]
刚刚说到flatMap()和map()类似,区别在于flatMap可以返回多个值,而map只能返回一个。但在Single中flatMap只能返回Single,几乎等同于map实用性不高。而flatMapObservable就不同了,它支持将Single转化为Observable对象,可以返回多个值。下面这个例子介绍如何将Single转化为Observable。
Single.just(1).flatMapObservable(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return Observable.just("H", "3", "c");
}}).subscribe(new Action1<String>() {// 注意这里返回值的区别
@Override
public void call(String s) {
LogHelper.e("kk:"+s);
}
});
这里传入一个整型1,输出"H","3","c"三个字符串。
from操作符[*]
Single的from操作符仅允许传入一个java Future对象,由于java Future几乎很少使用,所以该操作符在Single中没什么实用意义。
map操作符[***]
Single.just(1).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "x";
}}).subscribe(new Action1<String>() {// 注意这里返回值的区别
@Override
public void call(String s) {
LogHelper.e("kk:"+s);
}
});
map操作符之前有介绍过,用于类型一对一转换,比较简单。
merge & mergeWith操作符[*]
merge操作符类似于concat,他们的区别见下图
concat img
merge img
subscribeOn操作符[***]
用于指定异步任务的线程,常见的有:
Schedulers.computation( );// 计算线程
Schedulers.from(executor);// 自定义
Schedulers.immediate();// 当前线程
Schedulers.io();// io线程
Schedulers.newThread();// 创建新线程
Schedulers.trampoline();// 当前线程队列执行
onErrorReturn操作符[***]
相当于try catch中的return,具体意思就是当函数抛出错误的时候给出一个返回值,看代码:
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
singleSubscriber.onError(new Throwable("x"));
}}).onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 2;
}}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer s) {
LogHelper.e("kk:"+s);
}
});
返回结果2
observeOn
指定回调所在线程
// 常见的为,即Android UI线程
AndroidSchedulers.mainThread();
timeout操作符[***]
超时操作操作,在指定时间内如果没有调用onSuccess()就判定为失败,且可支持失败的时候调用其他Single()
toSingle操作符[*]
将传入一个参数的Observable转换为Single
Observable.just(1).toSingle();
zip & zipWith操作符[**]
如果说flatMap()是将一个Single变成多个的操作,那么zip刚刚相反,他可以将多个Single整合为一个
Single.zip(s1, s2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer o, Integer o2) {
LogHelper.e("A:" + o + "=" + o2);
return null;
}}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
LogHelper.e("kk:"+s);
}
});
RxJava Single的用法就全部讲完了,你学会了吗?如果还不会....那我告诉你Single的万金示例吧....
Single.just(1)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer + "";
}
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// result
}
});
用这个例子足以应付大部分的一入一输出的异步任务了。
下篇我们会介绍Subject。
网友评论
Single.just(addValue(1, 2)).subscribe(...);
例子中是用addValue(1, 2)的返回值,放到just()中,为何它抛的异常不是到当前上下文,而是被这个single获取呢?如果是Single.create((a) -> {a.next(addValue(1, 2))}) 那我觉得还能理解
Single<String> single = Single.create(new OnSubscribe<String>() {
@Override
public void call(SingleSubscriber<? super String> subscriber) {
subscriber.onSuccess("Tom");
}
});
SingleSubscriber<String> singleSubscriber = new SingleSubscriber<String>() {
@Override
public void onSuccess(String value) {
// TODO Auto-generated method stub
System.out.println("onSuccess value = " + value);
}
@Override
public void onError(Throwable error) {
System.out.println("onError error : " + error);
}
};
single.subscribe(singleSubscriber);
System.out.println("singleSubscriber.isUnsubscribed() ? " + singleSubscriber.isUnsubscribed());
}
运行结果:
onSuccess value = Tom
singleSubscriber.isUnsubscribed() ? false
官网说:Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
而代码运行后,为什么singleSubscriber没有取消订阅呢?
@Override
public String call(Integer integer) {
Log.d("RxJava","Func1.thread.id"+Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ""+(integer+1);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("RxJava","Action1.thread.id"+Thread.currentThread().getId());
}
});
Log.d("RxJava","onClick1Event.after:"+Thread.currentThread().getId());
//=================================================
06-23 08:39:33.481 23706-23706/com.coolstar.rxjavatest D/RxJava: onClick1Event.after:1
06-23 08:39:33.486 23706-26035/com.coolstar.rxjavatest D/RxJava: Func1.thread.id4202
06-23 08:39:34.491 23706-26035/com.coolstar.rxjavatest D/RxJava: Action1.thread.id4202
//==================================================
为什么我写的observeOn(AndroidSchedulers.mainThread())没起作用?
按作者原来的写法,Func在我这也是在1主UI线程中执行的。
Single.just(1).map(integer -> {
showCurrentThread("map function call()");
return integer + "";
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
// result
showCurrentThread("onSuccess");
});
}
private void showCurrentThread(String tag) {
Log.i(tag, " thread = " + Thread.currentThread());
}
运行结果:
I/map function call(): thread = Thread[main,5,main]
I/onSuccess: thread = Thread[RxCachedThreadScheduler-1,5,main]
是博主说的subscribOn和observeOn反了还是我哪里理解错了?