Create
使用一个函数从头创建一个Observable。
一个形式正确的有限Observable必须尝试调用观察者的onCompleted一次或者它的onError一次,而且此后不能再调用观察者的任何其它方法。
在传递给create方法的函数中检查观察者的isUnsubscribed状态,以便在没有观察者的时候,让你的Observable停止发射数据或者做昂贵的运算。
value = "Create";
tvContent.setText("");
tvTips.setText("https://www.jianshu.com/p/3b37d98d60ab");
Observable<String> source = getSource(value);
// 这个修改值不起变化,从创建Observable起,就已经确定传了此值
value = value + "修改值+1";
source.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
/**
* 创建被观察者
*
* @return ObservableSource
*/
private Observable<String> getSource(String value) {
return Observable.create(observableEmitter -> {
if (!observableEmitter.isDisposed()) {
for (int i = 1; i < 2; i++) {
observableEmitter.onNext("线程名称:" + Thread.currentThread().getName() + "\n" + "onNext:" + i + "\n");
}
observableEmitter.onNext("线程名称:" + Thread.currentThread().getName() + "\n" + "onNext:" + value + "\n");
observableEmitter.onComplete();
}
});
}
/**
* 创建观察者
*
* @return Observer
*/
private Observer<? super String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
tvContent.append("onSubscribe\n");
}
@Override
public void onNext(String string) {
System.out.println("接收----->" + string);
tvContent.append(string);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
}
网友评论