使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");
e.onComplete();
}
}).map(new Function<String, String>(){
@Override
public String apply(String s) throws Throwable {
return s+"<<<";
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("onSubscribe","onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e("onNext","onNext:"+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("onError","onError");
}
@Override
public void onComplete() {
Log.e("onComplete","onComplete" );
}
});
发射一个字符串,经过map转化,然后发到观察者onNext输出数据
查看源码
1、Observable.create(..)
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
接受一个ObservableOnSubscribe
参数,返回一个ObservableCreate
对象,并把参数传了进去,RxJavaPlugins 装配这个new的对象,返回Observable
,但实质是ObservableCreate
RxJavaPlugins是一个hook类,主要就是把你new出来的对象,使用的操作符转换成RxJava具体操作的一个类
2、.map(new Function<String, String>(){..})
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
因为create已经返回了一个Observable
对象,直接使用该对象的map方法。同样也是装配了一个ObservableMap(this,mapper)
this就是ObservableCreate
对象
mapper是需要转换的方法
这时候ObservableCreate
和ObservableMap
已经连起来了
3、.subscribeOn(Schedulers.io())
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
装配返回一个Observable
对象实质是ObservableSubscribeOn(this, scheduler)
this是ObservableMap对象
scheduler是Schedulers.io()线程调度对象,订阅放到了io线程 或者是newThread、single...
这时候 上游的ObservableMap已经和ObservableSubscribeOn连起来了,并且指定了订阅的地方是在io线程
4、.observeOn(AndroidSchedulers.mainThread())
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
装配返回Observable
对象,实质是ObservableObserveOn
this传入的是ObservableSubscribeOn
scheduler是Android的主线程
这时候ObservableSubscribeOn
和ObservableObserveOn
连起来了
5、.subscribe(new Observer<String>(){onSubscribe、onNext...等方法})
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer); //关键方法
} catch (NullPointerException e) {
} catch (Throwable e) {
}
}
主要就是subscribeActual(observer);
当前对象是ObservableObserveOn
,也就是ObservableObserveOn
调用了subscribeActual(observer)
并把观察者的对象传了进来,也就是 onSubscribe、onNext...等方法 的对象
到这里创建、订阅、关联已经基本完成
前面的类名比较乱,什么ObservableObserveOn
,又是什么ObservableSubscribeOn
,还有Observable
,后面还有ObserveOnObserver
等等 这些我第一次看也比较晕,绕了半天,后来想想还是根据英文意思来看
比如Observable 中文 可观察的 也就是 被观察者
Observe 中文 观察的
ObservableObserveOn 中文 被观察者 观察 在...
ObservableSubscribeOn 被观察者 订阅 在...
接下来是关键
订阅
理一下 ObservableCreate
-> ObservableMap
-> ObservableSubscribeOn
-> ObservableObserveOn
-> 执行subscribeActual()
在上面的方法中 执行subscribeActual()
对象是最后一个ObservableObserveOn
ObservableObserveOn.subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
第一个判断是判断scheduler对象是否是TrampolineScheduler,这个类里面注释说 在当前线程上工作,内容放入队列,但不会立即执行,暂时忽略
下面创建了一个 Worker
,new了一个对象ObserveOnObserver
,这个静态类,继承了Observer<T>(Observer<T>是为了下游对象)和Runnable()(run方法中有个判断默认是false,暂时不知道第一个方法是做什么的暂时忽略,网上说与backpressure有关 不是背后的压力,是后面的压力)。
ObserveOnObserver
这个类里面有upstream,有queue,有worker,是用来最后执行,你定义的Observer里面的方法
source订阅了ObserveOnObserver
,这个source对象是ObservableSubscribeOn
,执行了他的subscribeActual()
ObservableSubscribeOn.subscribeActual()
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这个类是 被观察者订阅在 定义的是io线程
首先new了一个对象SubscribeOnObserver
传入了 observer
这个是具体实现最后发射的对象
接着就调用了observer
的onSubscribe(parent)
方法
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
//去掉其他代码
}
queue = new SpscLinkedArrayQueue<>(bufferSize);
downstream.onSubscribe(this);
}
}
这里做一个保留,第二个if判断if (d instanceof QueueDisposable)
判断是否是 一次性队列
接着创建了一个SpscLinkedArrayQueue
这个类里面有一个 AtomicReferenceArray
原子的引用集合,保证发射出去的数据是原子操作不可被打断。
最后调用了下游的方法,回调到了我们定义的onSubscribe方法中打印log
回到前面 最后一行
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
分部来看
开始new了一个SubscribeTask这一个线程,并把parent传了进去,这个parent可以理解为当前实际操作对象 握着 下游 实例的对象
既然是个线程肯定有run方法
@Override
public void run() {
source.subscribe(parent);
}
这个source对象 实际是前一个结点的对象也就是map 也就是ObservableMap
,这里只定义了还没使用
接着上面的Task交给了scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
createWorker()
是IoScheduler中初始化的也就是指定 被观察者 在那个线程执行操作Schedulers.io()
public Worker createWorker() {
return new EventLoopWorker(pool.get()); //这里的get是返回pool的对象
}
//EventLoopWorker的构造函数
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get(); //这里的get是返回pool里一个ThreadWorker对象
}
final AtomicReference<CachedWorkerPool> pool;
static final class CachedWorkerPool implements Runnable {
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
}
这里有 一个pool.get()
对象,这是个原子性的缓存工作池
CachedWorkerPool
是个static的线程类,里面有个队列的数据结构,pool.get();
是返回一个ThreadWorker对象,循环找,找到就返回,找不到就new一个然后添加到pool里再返回,这个类也会定期清理里面的worker
回到scheduleDirect()
,第三行 new DisposeTask
一次性的任务 并传入 线程 和 “工作者”对象 。DisposeTask也是一个线程里面的run方法,执行了传入的线程。
w.schedule(task, delay, unit);
在IoScheduler中实现
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime,@NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
ThreadWorker继承NewThreadWorker 打开scheduleActual()
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
把线程和一个一次性的工作对象放入ScheduledRunnable
,在塞入线程池executor.schedule()
,前面的判断也很明显是否要延迟。
再回到前面scheduler.scheduleDirect
返回一个 DisposeTask
,再回到前面ObservableSubscribeOn.subscribeActual()
//这里复制是方法最开始的代码
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
parent.setDisposable( 把DisposeTask设置了一下 )
DisposableHelper.setOnce(this, d);
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
Objects.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose(); //主要是这个方法
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
这个d.dispose()
d对象是CompositeDisposable
,这个dispose()
里面主要就是把一个boolean disposed取反,主要为了 和之前绑定的线程 只能执行一次操作
到这里基本上前期的准备工作已经做好
回顾一下,开始create() 创建了ObservableCreate
-> ObservableMap
-> ObservableSubscribeOn
-> ObservableObserveOn
这些都是连起来的,并且都有一个source对象,source指向的上游对象
最后subscribe()
,执行ObservableObserveOn
的subscribeActual()
方法, 通过对source的subscribe()
,在ObservableSubscribeOn
中定义了Task线程并放到线程池中
,等待执行
SubscribeTask执行
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这个类是写在ObservableSubscribeOn
中,所以source是ObservableMap
对象subscribe()
实际执行还是subscribeActual(..)
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
这个source 实际对象是 ObservableCreate
,调用了subscribe
传入了new MapObserver
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里创建了一个发射器对象CreateEmitter
,传入了一个observer
,这个observer
就是上面传过来的MapObserver
下面这个MapObserver
订阅了这个发射器对象,也就是说这个发射器对象交给了下游的MapObserver
。
再下面的source就是MainActivity(这个名字无所谓主要是后面的对象 具体是在什么地方new的)的ObservableOnSubscribe
。
开始source.subscribe(parent)
回调到了我们实例化ObservableOnSubscribe
中,重写的方法,也就是这里
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");//这个e是之前定义的发射器对象CreateEmitter
e.onComplete();
}
})
发射器onNext("hello world")
进入CreateEmitter
的onNext()
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
先是一个判空,然后一次性检查,最后observer.onNext
,这个observer是初始化传入的,之前说了是MapObserver
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
第二个判断看变量注解是 保持上游已建立融合模式 ,这个也不是非常明白,看代码也明白这里肯定是false
下一步 mapper.apply(t)
,这个mapper是一个Function
对象,初始化的时候被赋值的,也就是创建的时候,也就是我们需要一个map操作符的时候,创建赋值的
这个apply会回调到map操作符 里面的apply(方法)。map是一个数据转换的,比如发射了int用map转换成String
转换后调用了下游的onNext,也就是下游的具体操作对象,SubscribeOnObserver
@Override
public void onNext(T t) {
downstream.onNext(t);
}
这里什么也没干 直接调用下游onNext,ObserveOnObserver
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
onNext 第二个判断不知道是我AS抽风了还是什么原因 Alt+单击 看到的一直是false但是真正的确实 true,这里面是把值 原子性的 放到队列中
继续schedule()
这里判断保证只做一次。worker.schedule(this)
传入了一个线程也就是本身this
这里的worker是HandlerWorker是AndroidSchedulers.mainThread()
,我们定义在android的主线程执行内容。
我们知道线程执行是执行run方法,直接跳到run(),暂时忽略Android里面handler发送消息 handler.sendMessageDelayed(message, unit.toMillis(delay));
只要知道run是在主线程执行的就ok
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
这里的if 也是和融合相关,暂时忽略
drainNormal()
关键的来了
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
乍一看。。两无限循环。。一看就好麻烦。。。
看到开始声明了两个变量一个q和a,q是队列,a是下游对象,再看到a.onNext()基本胜利一半了,里面内容无非就是从队列中获取数据,然后发射到下游去。
检查是否中止,不中止进入下一个循环,这个循环就比较简单了,从队列中取一个值,然后判空,不为空就发射,这个a对象就是我们定义的Observer中的onNext方法
再后面是一个原子性操作,这个我不确定作用具体是什么,忙猜是防止漏掉数据,为外面的循环多一层跳出的方式。希望有人能解决我的疑惑。我也不知道为什么这里会有for的无限循环。。这可读性太差了(应该是我太菜了🤣)
看一下他是怎么检查中止的
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
判断是否是disposed,接着d是个boolean是 传进来的done,影响这个done是有在onError()
或onComplete()
的时候
如果 手动 或者try cratch 抛出了异常 这个delayError会为true 这里会调用a.onError(e);否则调用结束
这里结束的时候有个 worker.dispose();
在最开始初始化的时候 一顿封装后 把DisposeTask交给了worker对象
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
这里官方的注释已经写明了 释放pool,将工作线程放回pool并刷新过期时间,还调用了dispose()处理一些一次性的标记
到这里基本一整个的流程基本上结束了
总结
个人觉着这个RxJava就是一个个的结点+链式调用,创建一个方法,它帮你用一个实际操作的类包起来,每个结点都有一个源结点,这个源结点指向的前一个对象,最后 subscribe 订阅时候 开始执行,一步步调用 源 结点,并一步步在结点中初始化,推到最前端,调用定义的方法,e.onNext() 继续调用下游的onNext() 最后调用到Observer的观察者方法,从而形成一个完整的 链。
我画了一个过程的图
create() 创建了
ObservableCreate
-> ObservableMap
-> ObservableSubscribeOn
-> ObservableObserveOn
-> Observer
每个对象都像一个结点,结点之间进行连接,当如果把map这个结点去掉,或者增加其他结点也是没问题的
感觉有些不足比如java的一些原子性对象的使用,准备来全面学习一下
如果内容有错误请及时联系我,我并加以改正
网友评论