Android进阶 Rxjava
1.Rxjava的使用
在壳工程的build.gradle
里面加上
dependencies {
...
implementation 'io.reactivex.rxjava2:rxjava:2.1.17'
//因为是Android工程,加上下面这个
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
}
然后在MainActivity
的onCreate()
里写上
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onNext("one");
emitter.onComplete();
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override public void accept(String s) {
Log.e("dragon",s);
}
});
我们一步步去看它怎么运行起来的,先看Create
//Flowable.class
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
重点看到这一句 RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
它将Flowable和module封装成了FlowableCreate,再将这个对象直接返回了。
onCreate()结束后,得到了一个FlowableCreate对象。
再看subscribeOn。
//Flowable.class
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}
将FlowableCreate对象转换成了FlowableSubscribeOn对象。
再看observeOn。
//Flowable.class
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
将FlowableSubscribeOn对象转换成了FlowableObserveOn对象。
然后来到subscribe,感觉好戏要开始了。
//Flowable.class
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
从上面可以看到,Consumer被转换成LambdaSubscriber对象,我们看看subscribe(ls)
会做什么动作。`
//Flowable.class
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Beta
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
不着急,先看try方法块里面的方法
//Flowable.class
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) {
BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe;
if (f != null) {
return apply(f, source, subscriber);
}
return subscriber;
}
奇怪了,在这个示例中,这个方法什么都没做。
那么关键的方案应该就是subscribeActual(z)
了。我们现在已经是FlowableObserveOn了,我们去这个里面看看它做些什么。
//FlowableObserveOn.class
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}
我们还是一行行地看,这个scheduler是什么?在将FlowableSubscribeOn对象转换成FlowableObserveOn对象的时候,传入了AndroidSchedulers.mainThread()。我们可以看看这个最常用的AndroidSchedulers.mainThread()
。
(一)
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
mainThread()会返回的是MainHolder.DEFAULT。
那我们去看看这个MainHolder.DEFAULT的createWorker()方法。
//HandlerScheduler.class
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
从HandlerWorker来看,它是继承了Worker的一个类,重载了@Override schedule(..)把传入的Runnable用Handler发射了出去。
(二)
接下来,程序跑到了
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch))。
- s,我们的LambdaSubscriber对象
- worker,HandlerScheduler
- delayError,false
- prefetch,128
我们知道,source是我们刚传进来的FlowableSubscribeOn对象,那我们去看看它接下来的subscribeActual(..)方法。
这个时候,执行了FlowableSubscribeOn.subscribeActual((上面得到的)ObserveOnSubscriber)。
先看方法内容
//FlowableSubscribeOn.class
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
s.onSubscribe(sos);
w.schedule(sos);
}
这个Worker是我们之前传入的Schedulers.newThread()得到的,那我们看看这个是什么。
/**
*Schedulers.class
*/
@NonNull
public static Scheduler newThread() {
//老规矩,返回的就是NEW_THREAD
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
@NonNull
static final Scheduler NEW_THREAD;
static {
...
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
@NonNull
public static Scheduler initNewThreadScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitNewThreadHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
@NonNull
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
从上面的跟踪可以看出,NEW_THREAD就是NewThreadHolder.DEFAULT。
仔细看下面这个NewThreadScheduler的内容。
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
NewTreadWorker传入的是一个创建线程为新守护线程,优先级为normal的线程工厂。
拿到NewThreadWorker对象后,看之后这行
(一)final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
- s,上面得到的ObserveOnSubscriber
- w,NewThreadWorker
- source,之前生成的得到了一个FlowableCreate对象对象
- nonScheduledRequests,false
接下来 s.onSubscribe(sos)
也就是上面得到的ObserveOnSubscriber.onSubscribe(SubscribeOnSubscriber)
- 跑到了LambdaSubscriber.onSubscribe(ObserveOnSubscriber),作用是
- Atomically sets the subscription on the field if it is still null.
- <p>If the field is not null and doesn't contain the {@link #CANCELLED}
- instance, the {@link #reportSubscriptionSet()} is called.
- 没太懂。
- s.request(prefetch);也就是SubscribeOnSubscriber.request(128).
这里也不太懂。
(二)w.schedule(sos);也就是NewThreadWorker.schedule(SubscribeOnSubscriber)。
SubscribeOnSubscriber被ScheduledExecutorService给提交运行。
@Override
public void run() {
lazySet(Thread.currentThread());
//之前生成的得到了一个FlowableCreate对象对象
Publisher<T> src = source;
source = null;
src.subscribe(this);
}
看到src.subscribe(this),也就是FlowableCreate.subscribe(SubscribeOnSubscriber)
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
...
default: {
//将SubscribeOnSubscriber作为actual传给了BufferAsyncEmitter
//bufferSize() 128
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
//SubscribeOnSubscriber.onSubscribe(BufferAsyncEmitter)
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
(一)先看 t.onSubscribe(emitter);
//SubscribeOnSubscriber.class
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.s, s)) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n));
}
}
//SubscribeOnSubscriber.class
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.s, s)) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n));
}
}
//BufferAsyncEmitter.class
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
@Override
void onRequested() {
drain();
}
//感觉什么都没做
void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
final Subscriber<? super T> a = actual;
final SpscLinkedArrayQueue<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
跑了一圈,感觉什么都没做啊
(二) 再看source.subscribe(emitter);
这个source很容易理解,我们一开始传进来的发射器。
刺激的事情来了。我们在发射器里发射的数据,被放进了队列。
@Override
public void onNext(T t) {
...
queue.offer(t);
drain();
}
//感觉什么都没做
void drain() {
...
for (;;) {
...
while (e != r) {
...
a.onNext(o);
...
}
...
}
}
好,我们发现调用了SubscribeOnSubscriber.onNext("one");
//SubscribeOnSubscriber.class
@Override
public void onNext(T t) {
actual.onNext(t);
}
跑到了
//ObserveOnSubscriber.class
@Override
public final void onNext(T t) {
...
trySchedule();
}
final void trySchedule() {
...
worker.schedule(this);
}
得了。这个worker是什么?HandlerScheduler!所以它把本身作为Runable抛给了主Looper。
//ObserveOnSubscriber.class
@Override
public final void run() {
...
runAsync();
...
}
@Override
void runAsync() {
int missed = 1;
//我们的LambdaSubscriber对象
final Subscriber<? super T> a = actual;
...
a.onNext(v);
...
}
LambdaSubscriber的actual对象就是我们传入的Consumer。
网友评论