前言
最近诸事缠身,有点忙,终于抽出时间&&有兴致写第三了,有一股深深的罪恶感,废话不多说,还是直接接上篇的活了,讲一讲一些比较常用的操作符吧。国际惯例,先丢两个传送门。
我所理解的RxJava——上手其实很简单(一)
我所理解的RxJava——上手其实很简单(二)
然后这篇讲RxJava中强大的Scheduler调度器 ,就是因为它,RxJava才能极其简便的在线程中切换,接着再讲一讲一些常用的操作符,比较简单容易理解的操作符都在本篇罗列出来,以后可能不定时更新这篇文章,复杂的操作符后续分篇讲。
Scheduler
在讲常用操作符前,先看看Scheduler这个东西,名之为调度器,正因为有这个东西,让RxJava可以从主线程和子线程之间轻松切换,各个Scheduler的具体使用效果看以下表解释:
| 调度器类型| 用途|
| -------------|-------------|-----|
| Schedulers.computation( ) |用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量|
| Schedulers.from(executor) | 使用指定的Executor作为调度器|
|Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
|Schedulers.io( )|用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器|
|Schedulers.newThread( )|为每个任务创建一个新线程|
|Schedulers.trampoline( )|当其它排队的任务完成后,在当前线程排队开始执行|
|AndroidSchedulers.mainThread()|此调度器为RxAndroid特有,顾名思义,运行在Android UI线程上|
具体如何使用呢,比如从数据库读取数据更新到UI上,假设数据量很大,直接从主线程读取数据,会造成UI卡顿,以前我们常用AnsyTask或者Handler去处理避免出现这类问题,个人认为手写个AnsyTask还是挺麻烦的,但用RxJava就简单多了,例如:
Observable.create(new Observable.OnSubscribe<Data>() {
@Override
public void call(Subscriber<? super Data> subscriber) {
Data data = getData();//从数据库获取
subscriber.onNext(data);
subscriber.onCompleted();
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Data>() {
@Override
public void call(Data data) {
//更新ui
}
});
简单粗暴的解释一下,subscribeOn( )
决定了发射数据在哪个调度器上执行,observeOn(AndroidSchedulers.mainThread())
则指定数据接收发生在UI线程,简直不要太方便。
常用操作符
-
Map:最常用且最实用的操作符之一,将对象转换成另一个对象发射出去,应用范围非常广,如数据的转换,数据的预处理等。
例一:数据类型转换,改变最终的接收的数据类型。假设传入本地图片路径,根据路径获取图片的Bitmap。
Observable.just(filePath).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String path) {
return getBitmapByPath(path);
}}).subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
//获取到bitmap,显示
}});
例二:对数据进行预处理,最后得到理想型数据。实际开发过程中,从后台接口获取到的数据也许不符合我们想要的,这时候可以在获取过程中对得到的数据进行预处理(结合Retrofit)。
Observable.just("12345678").map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.substring(0,4);//只要前四位
}})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("mytag",s);
}});
先说明一下,为了方便理解,所以写的例子都比较简单,不要以为明明可以简单用if-else解决的事,没必要用这种方式去写,当你真正将这些操作符使用到数据处理中去的时候,你就会发现有多方便。
-
FlatMap:和Map很像但又有所区别,Map只是转换发射的数据类型,而FlatMap可以将原始Observable转换成另一个Observable。还是举例说明吧。假设要打印全国所有学校的名称,可以直接用Map:
为了更清晰一点,先贴一下School类:
public class School {
private String name;
private List<Student> studentList;
public List<Student> getStudentList() {
return studentList;
}
public void setStudentList(List<Student> studentList) {
this.studentList = studentList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public static class Student{
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
接着用Map打印学校名称:
List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, String>() {
@Override
public String call(School school) {
return school.getName();
}}).subscribe(new Action1<String>() {
@Override
public void call(String schoolName) {
Log.i("mytag",schoolName);
}});
再进一步,打印学校所有学生的姓名,先考虑用Map实现,将所有School对象直接转成Student:
Observable.from(schoolList).map(new Func1<School, School.Student>() {
@Override
public School.Student call(School school) {
return school.getStudentList();
}}).subscribe(new Action1<School.Student>() {
@Override
public void call(School.Student student) {
Log.i("mytag",student.getName());
}});
看似可行,但事实上,这是一段错误的代码,细心的人就会发现错误的地方
@Override
public School.Student call(School school) {
return school.getStudentList(); //错误,Student 是一个对象,返回的却是一个list
}
所以用Map是无法实现直接打印学校的所有学生名字的,因为Map是一对一的关系,无法将单一的School对象转变成多个Student。前面说到,FlatMap可以改变原始Observable变成另外一个Observable,如果我们能利用from()
操作符把school.getStudentList()
变成另外一个Observable问题不就迎刃而解了吗,这时候就该FlatMap上场了,来看看它是怎么实现的:
Observable.from(schoolList).flatMap(new Func1<School, Observable<School.Student>>() {
@Override
public Observable<School.Student> call(School school) {
return Observable.from(school.getStudentList()); //关键,将学生列表以另外一个Observable发射出去
}}).subscribe(new Action1<School.Student>() {
@Override
public void call(School.Student student) {
Log.i("mytag",student.getName());
}});
Map和FlatMap在我看来就像孪生兄弟一样,非常实用,实际开发中也我也经常使用,个人觉得要想上手RxJava,掌握这两个操作符必不可少。
- Buffer:缓存,可以设置缓存大小,缓存满后,以list的方式将数据发送出去;例:
Observable.just(1,2,3).buffer(2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> list) {
Log.i("mytag","size:"+list.size());
}});
运行打印结果如下:
11-02 20:49:58.370 23392-23392/? I/mytag: size:2
11-02 20:49:58.370 23392-23392/? I/mytag: size:1
在开发当中,个人经常将Buffer和Map一起使用,常发生在从后台取完数据,对一个List中的数据进行预处理后,再用Buffer缓存后一起发送,保证最后数据接收还是一个List,如下:
List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, School>() {
@Override
public School call(School school) {
school.setName("NB大学"); //将所有学校改名
return school;
}}).buffer(schoolList.size()) //缓存起来,最后一起发送
.subscribe(new Action1<List<School>>() {
@Override
public void call(List<School> schools) {
}});
- Take:发射前n项数据,还是用上面的例子,假设不要改所有学校的名称了,就改前四个学校的名称:
Observable.from(schoolList).take(4).map(new Func1<School, School>() {
@Override
public School call(School school) {
school.setName("NB大学");
return school;
}}).buffer(4).subscribe(new Action1<List<School>>() {
@Override
public void call(List<School> schools) {
}});
- Distinct:去掉重复的项,比较好理解:
Observable.just(1, 2, 1, 1, 2, 3)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println("Next: " + item);
}
});
输出
Next: 1
Next: 2
Next: 3
- Filter:过滤,通过谓词判断的项才会被发射,例如,发射小于4的数据:
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return( item < 4 );
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println("Next: " + item);
}});
输出:
Next: 1
Next: 2
Next: 3
这一篇就先讲这么多吧,重点掌握Map和FlatMap操作符,因为真的很实用、很实用、很实用,重要的事讲三遍。
后语
其实,讲真,到现在RxJava已经非常普遍,用的人很多,网上的干货也很多,写这几篇文章呢,除了帮助想开始学RxJava的新手入门,最重要的还是起到一个笔记的作用吧,好记性不如烂笔头,这是真的。好吧,这篇就写这么多了,欢迎指出问题,互相交流学习。
更多精彩文章请关注微信公众号"Android经验分享":这里将长期为您分享Android高手经验、中外开源项目、源码解析、框架设计和Android好文推荐!
QQ交流群:Android经验分享一区 386067289
网友评论
Executor executor = Executors.newSingleThreadExecutor();
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(executor))
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.outWithThreadName(integer);
}
});
打印结果:
pool-1-thread-1, 1
pool-1-thread-1, 2
pool-1-thread-1, 3
pool-1-thread-1, 4
但是这样却没有输出结果,
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.outWithThreadName(integer);
}
});
💔
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "observeOnThread");
t.setDaemon(true);
return t;
}
});
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(executor))
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println(Thread.currentThread().getName() + "-------" + integer);
}
});
System.out.println("over");