作者: 一字马胡
转载标志 【2017-12-13】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-12-13 | RxJava学习笔记系列 | 系列笔记 (一) |
2017-12-15 | RxJava 学习笔记 (二) | 增加系列笔记(二) |
2017-12-15 21:36 | 考虑到RxJava很大程度上用于android开发中,而我自身不是移动开发者,所以暂时将RxJava学习笔记系列挂起,在未来需要使用RxJava的时候再继续学习,并且结合实际的应用来学习会收获更多 | 挂起 |
导入
其实在很早以前就接触过RxJava,并且当时学习RxJava还有一个产出:JSwitcher,这是一个基于RxJava的实验性框架,对于该框架的介绍可以参考下面的描述:
JSwitcher is a Convenient tool to switch schedule base on RxJava, and Jswitcher also implement a sample Version Observer/Observable, you can learn how RxJava works from the sample codes. it's easy to switch to Another schedule from current schedule. you just need to care about your bussiness, using 'switchTo' Operator to switch to the longing schedlue when you want to do the work on the suitable schedule. There are some especial schedules for you, like I/O Bound Schedule, Cpu Bound Schedule, And Single Schedule, etc, if you want to create an extra schedule by yourself, it's ok for JSwitcher, and it's very easy to do this .And the most important thing is the jswitch support 'chain operator', that means you can switch to a schedule, then fit on this schedule some works, then you can do switch operator continue from current position, or you can just fit another work on current schedule, and jswitcher has terminal operator like 'waitAndShutdown', after do the operator, you can not do 'chain operator' anymore. and the jswitcher will wait some time and shutdown all of schedule.
该框架将RxJava的核心部分抽离出来并做了一些简化处理,说到这里,需要提及一下,将一个复杂框架中的某部分抽象出来看似很简单,但是实际操作起来还是有一些困难的,并且在实际操作的过程中为了不涉及过多外围的内容时常需要简化,就是将一些依赖外围的核心部分中的某些内容抛弃,但是最为主要的骨架不能丢掉,这样操作下来会对整个框架有一定的了解。如果上面的描述激起了你的兴趣,可以实际去阅读JSwitcher框架代码,也可以作为快速入门RxJava的学习材料,但是该框架存在一些不确定性以及一些待研究正确性的点,所以不宜在实际项目中应用。
JSwitcher的核心功能是实现线程池的切换,并且支持按任务性质(I/O,Compute)来划分线程池,切换到合适的线程池可以提交任务,具体的使用可以参考下面的例子:
SwitcherFitter.switcherFitter()
.switchToIoSchedule() //switch to i/o bound schedule
.switchToSingleSchedule() //switch to single schedule
.fit(normalRunner, future1, true) //do the normal runner at current schedule
.switchToComputeSchedule() // switch to cpu bound schedule
.fit(normalRunner, future2, true) // do
.fit(timeoutRunner, future3, true) // do
.switchToSingleSchedule() //switch
.switchToSingleSchedule() //switch
.fit(timeoutRunner, future4, true) //do
.awaitFuturesCompletedOrTimeout(100,
completableFutures, timeoutFutures, 10) //wait for the future
.switchToComputeSchedule() //switch
.fit(() -> {
System.out.println("i am a tester->" + Thread.currentThread().getName());
}) // do the stupid work
.waitAndShutdown(1000); //wait and shutdown !
关于JSwitcher的设计,可以参考下面的图片:
本文作为学习RxJava的学习笔记的第一篇文章,会从RxJava的一些核心概念出发,并且从实际的例子来梳理RxJava的实现原理,当然,为了阅读的流畅性,每一篇文章不会涉及太多的内容。需要说明的一点是,本文乃至本系列的所有文章都是基于RxJava2,RxJava目前有两个版本,一个是RxJava1,一个是RxJava2,据说两个版本间的差别还是很大的,介于我的学习都是基于RxJava2的,并且没有接触过RxJava1,所以本系列文章不会涉及RxJava1与RxJava2的对比内容,所有内容都是基于RxJava2的。
Observer和Observable
学习RxJava之前,你需要了解什么是Reactive,我的理解是应该要和传统的代码进行对比学习,我们一般写代码都是命令式的,我们希望做什么就做什么,比如我们想下载一张图片,然后判断图片是否下载成功,如果成功了就展示出来,如果没有下载成功则使用兜底图片进行展示,如果没有兜底图片则不展示。下面是这个功能的伪代码实现:
Image img = EntryDownloadHelper.downloadImageByUrl(url, timeout)
if img is null
then
if FALLBACK_IMG != null
then img = FALLBACK_IMG
if img != null
then
ShowEntryHelper.showImage(img, height, weight)
看起来很熟悉并且很容易理解,那什么是Reactive的呢?如果使用RxJava来重写上面的代码,则代码看起来像下面这样:
String imgUrl = "xxx";
Image img = null;
Image FALLBACK_IMG = "xxx";
int timeout = 1000;
int height = 100;
int weight = 200;
Observable.create(new ObservableOnSubscribe<Image>() {
public void subscribe(ObservableEmitter<Image> e) throws Exception {
if (imgUrl == null || imgUrl.isEmpty()) {
e.onNext(FALLBACK_IMG);
} else {
img = EntryDownloadHelper.downloadImageByUrl(imgUrl, timeout);
if (img == null) {
e.onNext(FALLBACK_IMG);
} else {
e.onNext(img);
}
}
e. onComplete();
}
}).subscribe(new Observer<Image>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(Image s) {
if (s != null) {
ShowEntryHelper.showImage(img, height, weight);
}
}
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
public void onComplete() {
}
});
这只是一个简单的小例子,并没有什么使用价值,并且需要说明的一点是,RxJava更适合用于移动应用的开发,所以如果是做移动开发的话,学习RxJava的价值会更大,但是在一些其他的开发过程中也会使用到RxJava。
在上面的例子中,出现了两个比较关键的对象,ObServer和Observable,RxJava在实现Reactive的时候使用了观察者设计模式,Observable是被观察者,可以叫数据源,也可以叫做生产者,反正就是负责生产数据,并且将数据推送出去的东西,而ObServer是观察者对象,它会绑定到一个Observable上,并且观察Observable的行为,当ObServable触发事件的时候,ObServer会接收到事件,并且对相应的事件作出相应。所以可以将ObServer叫做事件的接收者,也可以叫做事件的消费者。有了观察者和被观察者,需要将两个角色联系起来,也就是上面所说到的将Observer绑定到Observable上,这个时候就需要使用Observable的subscribe方法,叫做订阅,下面会详细讲解Observable是如何将事件传递给Observer的。
学习一个新技术最开始需要做的就是写一个demo,并且运行起来,然后再继续学习下去。下面首先写一个RxJava的demo,下面的分析将会基于该demo:
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
e.onComplete();
}
}).subscribe(new Observer<String>() {
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}
public void onNext(String s) {
System.out.println("onNext:" + s);
}
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
public void onComplete() {
System.out.println("onComplete:");
}
});
首先需要创建一个Observable,可以使用Observable的静态方法create,当然可以直接new一个Observable对象,并且实现Observable的方法来实现,就像下面这样:
Observable<String> observable = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("ok");
observer.onComplete();
}
};
现在,Observable已经有了,下面就需要在该Observable上绑定一个Observer,就像上面的例子一样,使用Observable的subscribe方法,需要说明的一点是,可以在Observable做非常丰富的聚合操作,可以对Observable进行一系列聚合操作(比如map,filter等操作)之后再绑定Observer,但是本文不会涉及这些操作的内容,这些内容将在下一篇该系列的文章中出现。
目前Observable有六个subscribe方法供Observer选择:
- public final Disposable subscribe()
- public final Disposable subscribe(Consumer<? super T> onNext)
- public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
- public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
- public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
- public final void subscribe(Observer<? super T> observer)
可以选择这六个中的任意一个来绑定Observer,本文以一个看起来较为简单的subscribe方法来分析,也就是上面例子中使用的版本:
- public final void subscribe(Observer<? super T> observer)
下面展示了该方法的详细实现细节:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看起来代码很多,但是核心代码就一句:subscribeActual(observer),然后:
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic.
* <p>There is no need to call any of the plugin hooks on the current Observable instance or
* the Subscriber.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer<? super T> observer);
再看一下new一个Observable的代码:
Observable<String> observable = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
// XXX
}
};
也就是说,subscribe方法中会调用Observable的subscribeActual方法,并且将subscribe的参数(也就是绑定到该Observable的Observer)传递给subscribeActual,然后,我们在subscribeActual方法里面对subscribeActual的参数observer的操作实际上就是直接调用了Observer的方法,所以Observer当然会对响应相应的事件。
这个理解起来不太困难,下面看一下使用Observable的create静态方法来创建Observable的时候是怎么讲一个Observer绑定到一个create出来的Observable上的,回头看下面的代码:
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> e) throws Exception {
// XXX
}
}).subscribe(new Observer<String>() {
// XXX
});
这个看起来好像不能像上面那种情况一样理解,因为create的参数是new一个ObservableOnSubscribe对象,现在先来看一下create方法的具体实现细节:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
可以看到,create方法返回的是一个ObservableCreate对象,并且将我们的Observable对象传递给了ObservableCreate,这里使用了包装模式,将Observable包装成了ObservableCreate对象。在ObservableCreate类中找到了subscribeActual的实现,而这个subscribeActual正是实现了Observable的subscribeActual。所以包装需要包装彻底啊。下面是ObservableCreate类的subscribeActual的具体实现:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
在subscribeActual内部,又对Observer做了一次包装,将Observer对象包装成了CreateEmitter对象,为什么呢?因为在create方法的参数中我们new的Observable是一个ObservableOnSubscribe类型的对象,而ObservableOnSubscribe的subscribe的参数需要是CreateEmitter类型的,那我们new出来的ObservableOnSubscribe到哪去了呢?看下面的构造函数:
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
可以看到,我们new出来的ObservableOnSubscribe被保存在source字段中,在来看ObservableCreate类的subscribeActual方法,其中有关键的一句话:source.subscribe(parent),source是Observable,parent是Observer,只是Observer和Observable都是被包装了一层的。如果想具体了解到底是怎么包装的,可以参考CreateEmitter类,也可以借助这个机会学习一下包装模式,还是比较有用的。
本文是对RxJava学习笔记系列的第一篇文章,内容浅显易懂,没有涉及太多的内容,主要分析了一下RxJava中的两个重要的对象,Observable和Observer,并且梳理了一下一个Observer是如何绑定到一个Observable上的,当然,这是学习RxJava的基础内容,如果对这一部分内容都不清楚的话,还需要继续学习一下,本文涉及到两个设计模式,一个是观察者模式,一个是包装模式,结合具体的例子来看还是很好理解的。本文开头还介绍了一下JSwitcher,对于学习RxJava还是比较有帮助的。下面简单做一下RxJava学习笔记系列的文章计划:
- 《RxJava学习笔记 (一)》 : 了解RxJava中的Observable和Observer,并且明白如何实现订阅
- 《RxJava学习笔记 (二)》 : RxJava中Observable丰富的聚合操作支持的学习笔记
- 《RxJava学习笔记 (三)》 : RxJava2中的线程切换学习笔记
- 《RxJava学习笔记 (四)》 : RxJava Flowable学习
暂时定这几部分内容,在总结过程中如果发现还有什么内容需要补充的时候会进行补充更新。
扫码入群
网友评论