RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
RxJava的观察者模式
如果不知道观察者设计模式的话,建议先传送到这里:
RxJava它有四个概念
- Observer(观察者)
- Observable(被观察者)
- subscribe(订阅)
- 事件
以上的几个概念和普通观察者模式基本一样,但是RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
这两个事件都是在事件结束的时候一种标记,并且必须有且只有一个,
onCompleted() 标记事件流的完成,onError() 标记事件的异常结束,终止其他事件的发出。
RxJava的基本使用
根据以上的概念,RxJava的基本使用如下:
1.创建观察者 -Observer
观察者会对被观察者的事件触发做出响应。比如被观察者 发起onNext 事件就会回调观察者的 onNext 方法。但在发起onNext事件之前,被观察者需要订阅观察者,这个时候就会回调 onSubscribe 方法。而 onError 和 onComplete 方法分别是在被观察中发起 onComplete 和 onError 事件时调用,关于这两者的区别,刚刚提过。
public Observer<String> getObserver(){
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: ->"+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: Complete");
}
};
}
2.被观察者 -Observable
public Observable<String> getObservable(){
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("吃饭");
emitter.onNext("睡觉");
emitter.onNext("打豆豆");
emitter.onComplete();
}
});
}
这个方法里的内容也可以这么写
Observable<String> observable = Observable.just("吃饭","睡觉","打豆豆");
Observable<String> observable = Observable.fromArray("吃饭","睡觉","打豆豆");
3.订阅-subscribe
创建好了 Observer 和 Observable,需要用订阅关系也就是 subScribe方法 将它们建立连接,
Observable<String> observable = getObservable(); //创建被观察者
Observer<String> observer = getObserver(); //创建观察者
observable.subscribe(observer); //订阅
4.一步式写法
如果感觉上面的写法有点麻烦也可以用下面的方法,一步写成
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("吃饭");
emitter.onNext("睡觉");
emitter.onNext("打豆豆");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: ->"+s);
sb.append(s);
contentText.setText(sb+"->");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
}
});
在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。如果不指定线程的话,在哪个线程调用 subscribe(),就在哪个线程生产事件。上面的例子它就是在主线程中,所以在 onNext 方法里,我对UI进行了更新是没有抛异常的。这样它实现出来的只是一个同步的观察者模式。
但但但,,,但是异步对于 RxJava 是及其重要的
RxJava 的线程控制
RxJava的异步,比如进行网络请求(在io线程中)后更新ui(主线程)操作,这就要对 RxJava 的线程进行控制。RxJava 的线程控制需要用到调度器-Scheduler
RxJava 内置了几个调度器:
- Schedulers.immediate():这是默认的 Scheduler。它直接在当前线程运行,不指定线程。
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。它和
- newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行,由于更新ui。
下面我们用一个栗子来学习调度器的用法:
进行一个网络请求(需要切换到io线程中),获得返回的请求数据并更新UI(切断到主线程)
添加以下依赖库
implementation 'io.reactivex.rxjava2:rxjava:2.2.5'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'
implementation 'io.reactivex:rxandroid:1.2.1'
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
implementation 'com.squareup.retrofit2:adapter-rxjava:2.5.0'
这里的网络请求我们借助Retrofit框架
关于Retrofit这里就不介绍了,在后面笔者会关于它单独写一篇
定义一个接口
public interface Books {
@GET("chaxunyuyue/")
Call<List<BookInfo>> getBookInfo(@Query("username") String username);
}
针对返回的数据写一个javaBean
public class BookInfo {
private String yuyue_address;
private String yuyue_reason;
private String yuyue_time;
//....省略所以getter和setter方法
}
开始网络请求并更新ui
private void queryBookInfo(final String username){
String url = "http://smpark.chzu.edu.cn:8081/ipv6/";
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(url)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
final Books books = retrofit.create(Books.class);
Observable.create(new ObservableOnSubscribe<BookInfo>() {
@Override
public void subscribe(ObservableEmitter<BookInfo> emitter) throws Exception {
List<BookInfo> bookInfos = books.getBookInfo(username).execute().body();
Log.d(TAG, "subscribe: " + bookInfos.get(0).getYuyue_address());
BookInfo bookInfo = bookInfos.get(0);
emitter.onNext(bookInfo);
}
})
.subscribeOn(Schedulers.io()) // 1.指定 subscribe() 发生在 IO 线程
.subscribe(new Observer<BookInfo>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(BookInfo bookInfo) {
// Log.d(TAG, "onNext: " + bookInfo.getYuyue_reason());
String bookAddress = bookInfo.getYuyue_address();
Log.d(TAG, "onNext: " + bookAddress);
tvContent.setText("地址 :" + bookAddress);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.toString());
e.printStackTrace();
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
上面代码我用了一段式写法,在创建被观察者之后使用了调度器
.subscribeOn(Schedulers.io())
这样就把subscribe() 调度到 IO 线程中执行,开始我以为调度的同时也会把回调的结果 onNext() 等调度到IO线程,看别人的博客都会加上这样一句话
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
但是发现我更新ui的时候并没有抛异常,所以我觉得它没有将回调结果放于io线程中也有可能它自动切换到主线程中了。
总结
RxJava 的本质可以理解为异步这一个词,这篇博客笔者主要介绍了Rxjava的基本使用和线程控制,在下一篇博客,笔者会介绍它的其他内容。
网友评论