叫这个题目也是因为这篇博客写了太久太久了!有段时间都觉得完全没有必要写下去的,索性终于完工了,也算是对这段时间的肯定吧!
RxJava基本概念
RxJava
有四个基本概念:Observable
(可观察者,即被观察者)、 Observer
(观察者)、 subscribe
(订阅)、事件。Observable
和 Observer
通过 subscribe()
方法实现订阅关系,从而 Observable
可以在需要的时候发出事件来通知 Observer
。
接下来就围绕Observable创建、Observer创建、线程切换、事件类型转换、订阅和取消订阅展开。
Observerble的基本创建方式:
1、create()//最基本的
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("xxixxii");
subscriber.onCompleted();//这里必须调用该方法或者onError(),通知订阅者发送完毕,否者无法进行解除订阅。
}
});
2、form()//适配集合等
ArrayList<Student> students = new ArrayList<>();
students.add(s1);
students.add(s2);
students.add(s3);
students.add(s1);
students.add(s4);
students.add(s5);
students.add(s6);
Observable.from(students)
3、just()//适配已经写好的方法
Student s1 = new Student(19, "xiaoqiang");
Student s2 = new Student(19, "xiaoqiang1");
Student s3 = new Student(20, "xiaoqiang1");
Student s4 = new Student(19, "xiaoqiang");
Student s5 = new Student(21, "xiaoqiang");
Student s6 = new Student(22, "xiaoqiang");
Observable.just(s1, s2, s3, s4, s5, s6)
4、merge(o1,02)//将多个合并为一个
5、concat(o1,o2)//one by one emit!
Observer Subscriber的创建
Subscriber<String> stringSubscriber = new Subscriber<String>() {
@Override
public void onStart() {
Log.e(TAG, "onStart: ");
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: " + s);
}
};
Observer<String> stringObserver = new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
这里需要注意:Observer
和Subscriber
不仅基本使用方式一样,实质上,在 RxJava
的 subscribe
过程中,Observer
也总是会先被转换成一个 Subscriber
再使用。所以如果你只想使用基本功能,选择 Observer
和 Subscriber
是完全一样的。它们的区别对于使用者来说主要有两点:
onStart()
: 这是Subscriber
增加的方法。它会在subscribe
刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),onStart()
就不适用了,因为它总是在subscribe
所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用doOnSubscribe()
方法,具体可以在后面的文中看到。
unsubscribe()
: 这是Subscriber
所实现的另一个接口Subscription
的方法,用于取消订阅。在这个方法被调用后,Subscriber
将不再接收事件。一般在这个方法调用前,可以使用isUnsubscribed()
先判断一下状态。unsubscribe()
这个方法很重要,因为在subscribe()
之后,Observable
会持有Subscriber
的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如onStop()
、onDestory()
等方法中)调用unsubscribe()
来解除引用关系,以避免内存泄露的发生。
强大的条件筛选
说了这么多没用的东西,肯定要来点儿实际的才能体会到RxJava的强大功能!
1、take( )
只发送指定数量的事件。
2、filter( )
过滤指定条件的事件。
3、first()
只发送第一个事件。
4、distinct( )
只发送不同的事件。(怎么定义为不同?!)
其实还有很多。。。
随时随地线程切换
说完创建过滤你可能觉得这也没撒嘛!那么接下来想想之前在Android开发里要切换线程需要怎么处理呢?view.post()
或者使用handler.sendMessage()
!而在RxJava中,线程切换不用这么搞了,Schedulers
是RxJava中用来管理相关线程调度的,基于订阅和被订阅,这里有两个方法!
1、subscribeOn()
事件产生在哪个线程。
2、observeOn()
事件消费在哪个线程。
3、Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
4、Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。
5、Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
。其行为模式和 newThread()
是差不多滴,但是区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()
比newThread()
更有效率。不要把计算工作放在io()
中,可以避免创建不必要的线程。
6、Schedulers.computation()
: 计算所使用的 Scheduler。这个计算指的是CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()中,否则 I/O 操作的等待时间会浪费 CPU。
7、AndroidSchedulers.mainThread()
:Android 特供,它指定的操作将在 Android 主线程运行。
transform 转换
强大的内部转换功能,让你可以做到要什么就是什么。
1、map()
:进行对象转换,不会创建新的Observable
。
2、flatMap()
:也是进行对象转换,会创建新的Observable
。
3、buffer()
、:缓冲区,缓冲指定的Observable
包装成新的
4、Observable
发射。
5、toList()
:将单个的对象转换为集合。
取消订阅
爽了之后重视要记住一件事,那就是要释放相关资源!不然后果也是很严重的,尤其是在使用RxView
相关的方法时会警告你需要调用Unsubscribe()
来释放相关的引用。
释放操作其实很简单。定义一个集合维护相关的Subscription
,然后在Activity
的onStop()
或者onDestroy()
方法中释放相关资源。
Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
.throttleFirst(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "clicks->doOnUnsubscribe");
}
})
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
methd6();
}
});
//维护相关的资源引用
subscriptions.add(clickSubscribe);
@Override
protected void onDestroy() {
for (Subscription s : subscriptions) {
if (!s.isUnsubscribed()) {
s.unsubscribe();
Log.e(TAG, "onDestroy: 取消订阅!");
}
}
super.onDestroy();
}
手动create()
一个Observable
的话,一定要调用 onComplete()
或者onError()
来结束这个事件,不然资源也不会被释放的。
动手时间
练习一
统计集合中年龄大于20的学生姓名(年龄姓名一致的视为同一个!)
首先当然是创建Observable
!
//初始化数据
Student s1 = new Student(19, "xiaoqiang0");
Student s2 = new Student(20, "xiaoqiang1");
Student s3 = new Student(21, "xiaoqiang2");
Student s4 = new Student(22, "xiaoqiang3");
Student s5 = new Student(23, "xiaoqiang4");
Student s6 = new Student(24, "xiaoqiang5");
Student s7 = new Student(25, "xiaoqiang6");
Student s8 = new Student(25, "xiaoqiang5");
students = new ArrayList<>();
students.add(s1);
students.add(s2);
students.add(s3);
students.add(s1);
students.add(s4);
students.add(s5);
students.add(s6);
students.add(s7);
students.add(s8);
Observable.just(students)//创建Observable
.flatMap(new Func1<ArrayList<Student>, Observable<Student>>() {
@Override
public Observable<Student> call(ArrayList<Student> students) {
//变换为新的Observable
return Observable.from(students);
}
})
//过滤掉年龄和姓名相同的对象
.distinct()
//过滤掉年龄小于20的对象
.filter(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getAge() >= 20;
}
})
//将事件对象由Student 转换为 String
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: 取消订阅了!!");
}
})
//订阅
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: " + s);
}
});
E/MainActivity: onNext: xiaoqiang1
E/MainActivity: onNext: xiaoqiang2
E/MainActivity: onNext: xiaoqiang3
E/MainActivity: onNext: xiaoqiang4
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: onNext: xiaoqiang6
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: call: 取消订阅了!!
PS:至于这里对象的唯一性判断是通过复写equal()
和hashCode()
来实现的!
@Override
public boolean equals(Object o) {
return o instanceof Student && this.getAge() == ((Student) o).getAge() && this.getName().equals(((Student) o).getName());
}
@Override
public int hashCode() {
return Arrays.hashCode(new Object[]{getAge(), getName()});
}
练习二
concat()
使用:本地有缓存读取本地的数据,没有走网络请求并缓存到本地。
Observable<String> netObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "走网络了!!");
subscriber.onNext("这是缓存数据!!");
subscriber.onCompleted();
}
}).doOnNext(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: 保存数据到本地");
rxPreferences.getString("cash").asAction().call(s);
}
});
Observable<String> nativeObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (TextUtils.isEmpty(rxPreferences.getString("cash").get())) {
Log.e(TAG, "没有缓存,走网络!");
subscriber.onCompleted();
} else {
Log.e(TAG, "有缓存!");
subscriber.onNext(rxPreferences.getString("cash").get());
subscriber.onCompleted();
}
}
});
Observable.concat(nativeObservable, netObservable)
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成了!");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "错误了!");
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
});
//第一次
E/MainActivity: 没有缓存,走网络!
E/MainActivity: 走网络了!!
E/MainActivity: call: 保存数据到本地
E/MainActivity: 这是缓存数据!!
E/MainActivity: 完成了!
//第二次
E/MainActivity: 有缓存!
E/MainActivity: 这是缓存数据!!
E/MainActivity: 完成了!
练习三
merge()
使用:服务端和本地都有相关数据,汇总展示。使用merge()对应的事件顺序是无序的,谁先产生了谁就先发送!
Observable<String> just = Observable.just("S", "O", "S")
.subscribeOn(Schedulers.newThread())
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
SystemClock.sleep(20);
}
});
Observable<String> just1 = Observable.just("S","T","R").subscribeOn(Schedulers.newThread())
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
SystemClock.sleep(20);
}
});
Observable.merge(just1, just)
.subscribeOn(Schedulers.newThread())
.distinct()
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: " + s);
}
});
E/MainActivity: call: S
E/MainActivity: call: T
E/MainActivity: call: R
E/MainActivity: call: O
//或者这样
E/MainActivity: call: S
E/MainActivity: call: T
E/MainActivity: call: O
E/MainActivity: call: R
练习四
RxView和RxCompoundButton的使用:
RxCompoundButton.checked(checkBox).call(rxPreferences.getBoolean("checked").get());
//noinspection ConstantConditions
Subscription checkedSubscription1 = RxCompoundButton.checkedChanges(checkBox)
.subscribe(rxPreferences.getBoolean("checked").asAction());
subscriptions.add(checkedSubscription1);
Subscription checkedSubscription = rxPreferences.getBoolean("checked")
.asObservable()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "rxPreferences->doOnUnsubscribe");
}
})
.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Log.e(TAG, "rxPreferences->onNext: +onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
Log.e(TAG, "rxPreferences->onNext: " + aBoolean);
}
});
subscriptions.add(checkedSubscription);
练习五
RxJava和Retrofit的搭配使用:
Subscription beforeSubscribe = BaseDataManager.getDailyApiService()
.getBeforeDailyStories(date)
.subscribeOn(Schedulers.io())//事件产生在子线程
.doOnSubscribe(new Action0() {//subscribe之后,事件发送前执行。
@Override
public void call() {
isLoadingMore = true;
Log.e("TAG", "call: true");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<DailyStories>() {
@Override
public void call(DailyStories dailyStories) {
mView.onLoadMore(dailyStories);
mView.isLoadingMore(false);
Log.e("TAG", "call: false");
isLoadingMore = false;
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
mView.onLoadError(throwable.toString());
isLoadingMore = false;
}
});
subscribe(beforeSubscribe);
相关回调
对于Observable,这里有一系列的回调方法,作用在不同的时期,其中常用的是doOnSubscribe()
和doOnNext()
。
另外在Subscriber里,还有一个onStart()
的方法!
Observable.just("L", "O", "V", "E")
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doOnSubscribe");
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doOnUnsubscribe");
}
})
.doOnEach(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "doOnEach: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "doOnEach: onError");
}
@Override
public void onNext(String s) {
Log.e(TAG, "doOnEach: onNext:"+s);
}
})
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: doOnNext");
}
})
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e(TAG, "call: doOnRequest");
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doOnTerminate");
}
})
.doAfterTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "call: doAfterTerminate");
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "subscribe->call: onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "subscribe->call: onError");
}
@Override
public void onNext(String s) {
Log.e(TAG, "subscribe->call: onNext:" + s);
}
});
E/MainActivity: subscribe->call: onStart:
E/MainActivity: call: doOnRequest
E/MainActivity: call: doOnSubscribe
E/MainActivity: doOnEach: onNext:L
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:L
E/MainActivity: doOnEach: onNext:O
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:O
E/MainActivity: doOnEach: onNext:V
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:V
E/MainActivity: doOnEach: onNext:E
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:E
E/MainActivity: doOnEach: onCompleted
E/MainActivity: call: doOnTerminate
E/MainActivity: subscribe->call: onCompleted
E/MainActivity: call: doOnUnsubscribe
E/MainActivity: call: doAfterTerminate
通过Log可以看到一个事件的订阅过程。
首先回调Subscriber.onStart()
表明Observable
和Subscriber
已经建立了连接,但是这个时候事件还没有开始发射。
然后是 doOnRequest()
和doOnSubscribe()
,这个时候事件也没有开始发射。
接着是doOnEach()
和doOnNext()
和onNext()
,这个时候事件正式发射了。
最后发射完了,就是onCompleted
、doOnTerminate()
,然后取消订阅释放资源doOnUnsubscribe()
。
Rx全家桶
欢迎加入Rx全家桶豪华套餐,只有你想不到的,没有它做不到的!
Rx全家桶小结
总的来说,使用RxJava之后,可以让我们的代码更加清晰一目了然,而且线程的切换也非常的简单,不用自己维护相关线程和写那该死的Runnable
,内部强大的事件转换和筛选过滤也是为我们开发省去了不少的工作。
参考文档
1、给 Android 开发者的 RxJava 详解
2、RxJava wiki
3、RxJava使用场景小结
4、Demo下载
---- Edit By Joe ----
网友评论
if (!s.isUnsubscribed()) {
s.unsubscribe();
Log.e(TAG, "onDestroy: 取消订阅!");
}
}
是否可以优化成使用CompositeSubscription;
只需要mSubscriptions = new CompositeSubscription();
mSubscriptions.add(subscription);
...//在创建的时候添加进去
mSubscriptions.clear();//在onDestroy等方法里调用就行了
作者的很干货~学到了东西~只是些许建议,加油↖(^ω^)↗