无背压
代码示例
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
LogUtils.loge("Observable onSubscribe subscribe...");
if (!emitter.isDisposed()) {
emitter.onNext("test1");
emitter.onComplete();
}
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.loge("Observer onSubscribe ...");
}
@Override
public void onNext(String s) {
LogUtils.loge("Observer onNext str = " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
LogUtils.loge("Observer onComplete ...");
}
});
基本元素
Observable
- 观察得到的-被观察者,不支持背压
- 通过Observable创建一个可观察的序列(create)
- 通过subscribe去注册一个观察者
Observer
- 用于接收数据----观察者
- 作为Observable的subscribe的方法的参数
Disposable
- 和Rxjava1的Susbscription的作用相当
- 用于取消订阅和获取当前的订阅状态
ObservableOnSubscribe
- 当订阅时会触发此接口调用
- 在Observable内部,实际作用是向观察者发射数据
Emitter
- 一个发送数据的接口,和Observer的方法类似
- 本质是对Observer和Subscriber的包装
流程分解
io.reactivex.Observable#create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
io.reactivex.plugins.RxJavaPlugins#onAssembly(io.reactivex.Observable<T>)
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
// onObservableAssembly为里为空,所以f为空。返回的是我们传入的observable
if (f != null) {
return apply(f, source);
}
return source;
}
io.reactivex.Observable#subscribe(io.reactivex.Observer<? super T>)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
// 这里实际上是调用的是ObservableCreate中的subscribeActual方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
} catch (Throwable e) {
}
}
io.reactivex.plugins.RxJavaPlugins#onSubscribe(io.reactivex.Observable<T>, io.reactivex.Observer<? super T>)####
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
// onObservableSubscribe,所以f为空。返回的是我们传入的observer
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
io.reactivex.internal.operators.observable.ObservableCreate
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 构建Emitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用observer的onSubscribe,并传入创建的Emitter
observer.onSubscribe(parent);
try {
// 调用subscribe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
@Override
public boolean isDisposed() {
return emitter.isDisposed();
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
io.reactivex.internal.disposables.DisposableHelper 单例类
public enum DisposableHelper implements Disposable {
DISPOSED
;
public static boolean isDisposed(Disposable d) {
// 判断需要dispose的对象是否是已经dispose的
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
/ 取到当前的Disposable的对象
Disposable current = field.get();
// 得到已经disposed的对象
Disposable d = DISPOSED;
if (current != d) {
// 将AtomicReference中Disposable标识为disposed状态
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
@Override
public boolean isDisposed() {
return true;
}
}
有背压
代码示例
Flowable.create((FlowableOnSubscribe<String>) emitter -> {
LogUtils.loge("FlowableOnSubscribe subscribe");
if (!emitter.isCancelled()) {
emitter.onNext("test11");
emitter.onComplete();
}
}, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
// 需要主动发起请求否则会请求不到数据
s.request(Integer.MAX_VALUE);
LogUtils.loge("Subscriber onSubscribe");
}
@Override
public void onNext(String s) {
LogUtils.loge("Subscriber onNext s = " + s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
LogUtils.loge("Subscriber onComplete");
}
});
基本元素
Flowable
- 易流动的--------被观察者,支持背压
- 通过Flowable创建一个可观察的序列(create方法)
- 通过subscribe去注册一个观察者
Subscriber
- 一个单独接口,和Observer方法类似
- 作为Flowable的subscribe方法的一个参数
Subscription
- 订阅,和Rxjava1有所不同
- 支持背压,有用于背压的request方法
FlowableOnSubscribe
- 当订阅时会触发此接口调用
- 在Flowable内部,实际作用是向观察者发射数据
OnSubscribe
Emitter
- 一个发射数据的接口,和Observer的方法类似
- 本质是对Observer和Subscriber的包装
流程分解
io.reactivex.Flowable#create
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
// 这里返回的就是FlowableCreate
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
io.reactivex.Flowable#subscribe(org.reactivestreams.Subscriber<? super T>)
public final void subscribe(Subscriber<? super T> s) {
subscribe(new StrictSubscriber<T>(s));
}
io.reactivex.Flowable#subscribe(io.reactivex.FlowableSubscriber<? super T>)
public final void subscribe(FlowableSubscriber<? super T> s) {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
// 调用的是FlowableCreate的subscribeActual方法
subscribeActual(z);
}
io.reactivex.internal.operators.flowable.FlowableCreate
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
// 根据背压策略构建emitter
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
// 调用 Subscriber的onSubscribe
t.onSubscribe(emitter);
try {
// 调用subscribe方法
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
io.reactivex.internal.operators.flowable.FlowableCreate#DropAsyncEmitter
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
DropAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
}
io.reactivex.internal.operators.flowable.FlowableCreate#NoOverflowBaseAsyncEmitter
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
// get的值不为0才会调用onNext方法
// 只有调用了request(n)的时候,这里才不会为0
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
io.reactivex.internal.operators.flowable.FlowableCreate#BaseEmitter
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;
final Subscriber<? super T> downstream;
final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> downstream) {
this.downstream = downstream;
this.serial = new SequentialDisposable();
}
@Override
public final void cancel() {
serial.dispose();
onUnsubscribed();
}
@Override
public final boolean isCancelled() {
return serial.isDisposed();
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
}
}
}
io.reactivex.internal.subscriptions.SubscriptionHelper#validate(long)
public static boolean validate(long n) {
/* 如果传入的值小于等于0的时候,就直接返回了, 这就是为啥不调用这个request方法,onNext方法是不走的
*/
if (n <= 0) {
RxJavaPlugins.onError(new IllegalArgumentException("n > 0 required but it was " + n));
return false;
}
return true;
}
io.reactivex.internal.util.BackpressureHelper#add
public static long add(AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
// requested的这里会被设置成最大值
if (requested.compareAndSet(r, u)) {
return r;
}
}
}
io.reactivex.internal.util.BackpressureHelper#produced
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
源码阅读总结
不调用request,onNext不被执行的原因分析
Subscriber->onSubscribe
s.request(Integer.MAX_VALUE);
FlowableCreate.BaseEmitter#request ->
io.reactivex.internal.util.BackpressureHelper#add
将设置的值更新到BaseEmitter中,BaseEmitter继承自AtomicLong
FlowableOnSubscribe-> subscribe
emitter.onNext(a);
io.reactivex.internal.operators.flowable.FlowableCreate.NoOverflowBaseAsyncEmitter#onNext
获取AtomicLong的值,不为0的时候,才会调用Subscriber的onNext方法
总之,Flowable是复用强制拉取,解决背压策略的
网友评论