rxajava简单使用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(34);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1、create操作符分析
Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins.onAssembly先考虑简单的情况,就是直接返回方法中传入的参数,即直接返回ObservableCreate。
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);//实现即是前面的e.onNext(34);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
里面的source就是ObservableCreate的create方法传入的新new 的ObservableOnSubscribe匿名类实例。
2、subscribe操作符
susbcribe方法其实调用的是Observable中的subscribe方法。subscribe中的逻辑不多,subscribe方法中会调用需要子类实现的抽象方法subscribeActual。subscribeActual方法才是逻辑核心。
看前面的ObservableCreate的subscribeActual实现。
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
···
source.subscribe(parent);
}
subscribe方法展开就是前面的
e.onNext(34);
所以这样就通过观察者模式,被观察者向观察者发送了数据。
CreateEmitter对Observer进行了一层封装,当调用onNext、onError向被观察者发送数据时,先判断是否已取消订阅。如果取消了,则不向被观察者发送数据。
4、map操作符
map操作符使用例子如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(34);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "string--"+integer;
}
}).subscribe(new Observer<String>() {
···
@Override
public void onNext(String value) {
Log.e(TAG, "onNext: "+value);
}
···
});
map操作符源码如下:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
可以看出map操作符核心就是创建了一个ObservableMap。
Function是一个接口,里面就一个apply方法
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(T t) throws Exception;
}
ObservableMap源码如下:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
···
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
···
}
}
MapObserver中删除了部分代码,只保留核心代码。可以看到在其subscribeActual方法中,将Observer包装成了MapObserver,然后再调用上游source的susbcribe方法。
![](https://img.haomeiwen.com/i13958217/4496a5d47f2a09bf.png)
5、线程切换 subscribeOn分析
subscribeOn操作符通常使用如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
subscribeOn() 创建了或者说将上一级被观察者再包装成 ObservableSubscribeOn
Schedulers.io()最终实际创建了IoScheduler。
ObservableSubscribeOn.java 源码如下:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
···
}
可以看到将上游的source.subscribe(parent)提交到scheduler.scheduleDirect()方法中进行执行,完全可以猜测是提交到线程池中进行执行。
再看下Scheduler.java 源码:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
可以看到传进去的Runnable最终交给了Worker的schedule进行提交。
再看一下我们制定的Scheduler的实现类IoScheduler。
IoScheduler.java
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
看EventLoopWorker的schedule方法。可以看出Runnable提交给了threadWorker。
NewThreadWorker.java
private final ScheduledExecutorService executor;
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
最终提交到线程池进行执行。所以整个源码流程下来,就是将上游source的subscribe调用放到了线程池中进行执行。
6、线程切换observeOn操作符
调用observeOn操作符,生成了ObservableObserveOn。
ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
···
}
可以看出其将Observer又包装了一层,并将Scheduler.Worker,即我们再observerOn操作符中设置的线程调度器Scheduler.Worker传给Observer。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
···
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
//将待发送数据放到队列里
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//整个ObserveOnObserver实现了Runnable接口。
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
···
a.onNext(v);
}
···
}
}
}
上游要将数据最终发送给观察者Observer,所以直接看onNext方法。
在onNext方法中,先将要发送的数据放入队列queue中,然后将整个ObserveOnObserver作为Runnable提交给worker。worker具体实现后面再分析。
然后在run方法中调用drainNormal(),drainNormal方法中再从队列中取出数据,通过Observer.onNext()方法,将数据发送给了被观察者。
AndroidSchedulers.mainThread()方法返回了HandlerScheduler。
AndroidSchedulers.java
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
HandlerScheduler中传入了持有主线程Looper的Handler。
HandlerScheduler.java
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
}
核心在HandlerWorker的schedule方法。毫无悬念还是用的handler抛到主线程执行,不过这里的用法有点不一样。将Runnable塞到了Message里面,然后执行Handler的sendMessageDelayed,此处handler使用的是主线程的Looper,从而达到在主线程执行的目的。
以前遇到的切换主线程执行一般是如下方式:
Handler handler = new Handler(getMainLooper());
handler.post(new Runnable() {
@Override
public void run() {
}
});
总结
rxjava源码可以简要概括为在被观察者Observable的subscribe方法调用之前,都是对上游的Observable再包装一层。调用subscribe方法之后,对Observer再逐级往上进行包装。
网友评论