首先看使用代码
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//被观察者的create需要一个ObserableSubscribe与之产生关联
e.onNext("");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
以上是示例代码 下面进行源码分析
思路分析 Obserable.create方法 进去源码代码如下:
第一步:看create方法的参数 点进去发现它是一个接口

可以看到create方法 传入了一个ObservableOnSubscribe<T> source的东西,那么进入ObservableOnSubscribe这个源代码
可以看到它是一个接口代码如下:

可以看到接口里面有个ObservableEmitter<T> 这个是一个监听器等下会调用的
第二步再来看RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
这里创建了一个ObservableCreate的实例点进去看下如下:

网友评论