RxJava 官网地址:http://reactivex.io/documentation/operators.html
RxJava 中文翻译:https://mcxiaoke.gitbooks.io/rxdocs/content/
RxJava Github: https://github.com/ReactiveX/RxJava
RxAndroid Github:https://github.com/ReactiveX/RxAndroid
RxJava作用
一个词:异步。
前面我们已经讲到了Android retrofit基本使用及原理,在数据请求到后,onResponse方法是在子线程内,内不能进行UI更新,故而需要用Handler或者runOnUiThread进行UI更新,在我们学习到RxJava后就可以才用一种更加简洁的方式去处理异步调用了。
RxJava好处
一个词:简洁。
随着程序逻辑变得越来越复杂,它依然能够保持简洁。
操作符和基本使用方法参见下面文字和demo
参考文章
https://www.jianshu.com/p/cd984dd5aae8
参考示例
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
https://github.com/nanchen2251/RxJava2Examples
1、引入依赖:
implementation 'io.reactivex.rxjava2:rxjava:2.2.5'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
2、操作基本步骤
①、创建被观察者--发送数据
Observable observable=Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("what");
emitter.onNext("do");
emitter.onNext("you");
emitter.onNext("ask");
emitter.onNext("me");
emitter.onComplete();
}
});
②、创建观察者--接收数据
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//d.dispose();//用于销毁资源
Log.d("onSubscribe-onNext", "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d("onSubscribe-onNext", s);
}
@Override
public void onError(Throwable e) {
Log.d("onSubscribe-onError", e.toString());
}
@Override
public void onComplete() {
Log.d("onSubscribe-onComplete", "onComplete");
}
};
③、产生订阅关系
observable.subscribe(observer);
测试结果:
01-11 19:24:58.810 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: onSubscribe
01-11 19:24:58.810 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: what
01-11 19:24:58.811 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: do
01-11 19:24:58.811 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: you
01-11 19:24:58.811 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: ask
01-11 19:24:58.812 30211-30211/com.hzy.androidrxjava D/onSubscribe-onNext: me
01-11 19:24:58.812 30211-30211/com.hzy.androidrxjava D/onSubscribe-onComplete: onComplete
RxJava原理
Observable.create()方法
/**
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style
* 提供一个API 通过被观察者将反应式世界与回调式世界连接起来。
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));//调用hook关联方法;
}
/**
* Calls the associated hook function.
* 此处将source返回通过Observable<T>去接收
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
ObservableOnSubscribe是一个接口,定义了subscribe()方法,用来接收ObservableEmitter的实例,以安全,可取消的方式发送事件;
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableEmitter是Emitter(发射器)的子类;ObservableEmitter有序的发送onNext(),onError(),onComplete()方法;通过serialize()方法确保按照你想要的顺序发送;
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Observer是一个接口,为接收基于推的通知提供了一种机制,在observable.subscribe(observer)之后,最先调用onSubscribe(),然后是数量不等的onNext(),最后仅调用一次onComplete() 或者onError(),这两个相互冲突,只有一个执行;在onSubscribe可以销毁资源,调用d.dispose();调用之后,后面的方法不再执行;
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
subscribe()中的subscribeActual(observer)在Observable是抽象方法,实际的订阅方法;操作符应该实现执行必要业务逻辑的方法,我们看下ObservableCreate类
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);//实际的订阅方法;
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
ObservableCreate是Observable的子类,重写了subscribeActual()方法,
@Override
protected void subscribeActual(Observer<? super T> observer) {
//CreateEmitter是静态匿名内部类,实现了ObservableEmitter和Disposable接口;
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//observer调用onSubscribe(),可以销毁资源
observer.onSubscribe(parent);
try {
//执行subscribe()方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
CreateEmitter内有一个serialize()方法,确保按照你想要的顺序发送事件,返回了一个SerializedEmitter实例;
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
SerializedEmitter类连续的调用onNext, onError and onComplete,发送事件;在发送事件之前都会判断emitter是否销毁资源或者是否已经调用了onError或者onComplete;如果已经调用,就不在发送事件;
以下是SerializedEmitter发送事件的逻辑判断:
SerializedEmitter(ObservableEmitter<T> emitter) {//构造函数
this.emitter = emitter;
this.error = new AtomicThrowable();
this.queue = new SpscLinkedArrayQueue<T>(16);//事件缓存数量为16;
}
@Override
public void onNext(T t) {//SerializedEmitter的onNext()
if (emitter.isDisposed() || done) {//销毁资源或者done=true(已经执行了onComplete或者)
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() == 0 && compareAndSet(0, 1)) {// 调用Native方法compareAndSet,执行CAS操作
emitter.onNext(t);//发送onNext()方法
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<T> q = queue;
synchronized (q) {
q.offer(t);//offer添加一个元素并返回true如果队列已满,则返回false
}
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {//onError根据tryOnError的返回值
if (emitter.isDisposed() || done) {//如果已经销毁子资源或者done= true;
return false;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (error.addThrowable(t)) {//抛出异常
done = true;
drain();//drain()内部调用drainLoop()方法
return true;
}
return false;
}
@Override
public void onComplete() {
if (emitter.isDisposed() || done) {//资源销毁或者done= true
return;
}
done = true;
drain();
}
void drain() {
if (getAndIncrement() == 0) {
drainLoop();
}
}
void drainLoop() {//drainLoop()方法,内部双层无限循环,
ObservableEmitter<T> e = emitter;
SpscLinkedArrayQueue<T> q = queue;
AtomicThrowable error = this.error;
int missed = 1;
for (;;) {
for (;;) {
if (e.isDisposed()) {//如果已经销毁资源,清空队列,退出循环
q.clear();
return;
}
if (error.get() != null) {//error不为null,队列清空,调用onError()方法,退出循环
q.clear();
e.onError(error.terminate());
return;
}
boolean d = done;
T v = q.poll();//poll 移除并返回队列头部的元素 如果队列为空,则返回null
boolean empty = v == null;
if (d && empty) {//如果done= true;并且当前事件为nul,调用onComplete()方法,退出循环
e.onComplete();
return;
}
if (empty) {//当前事件为null;
break;
}
e.onNext(v);//调用onNext()
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
参考文章
https://gank.io/post/560e15be2dca930e00da1083
https://www.jianshu.com/p/a406b94f3188
https://blog.csdn.net/firelion0725/article/details/51093512
https://github.com/rengwuxian/RxJavaSamples
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
https://github.com/nanchen2251/RxJava2Examples
https://blog.csdn.net/fengluoye2012/article/details/79149201
网友评论