如何将一个一步操作的方法转为RX的Observable
1.定义接口Async2Rx
public interface Async2Rx<T>{
void start(AsyncCallBack<T> callback);
void cancel();
interface AsyncCallBack<T> {
void onSucc(T t);
void onError(Throwable throwable);
}
}
2.该类实现Async2Rx接口:
public static class Test implements Async2Rx<String>{
@Override
public void start(final AsyncCallBack<String> callback) {
//异步的操作,比如买东西
buySomeThing(new OnPayListener(){
@Override
public void onPaySuccess(String success) {
callback.onSucc(success);
}
@Override
public void onPayFailed(Throwable throwable) {
callback.onError(throwable);
}
});
}
@Override
public void cancel() {
//收到取消通知时,取消异步任务。比如用户调用了:Disposable.dispose()方法
cancelBuy();
}
}
3.生成Observable:
Test mTest=xxx;
Observable observable = AsyncObservable.create(mTest);
该方法,可以设置超时时间单位为秒,不设置的话默认为40秒
核心类:
public class AsyncObservable<T> extends Observable<T> {
private static final int DEFAULT_TIME_OUT = 40; //超时时间
private final Async2Rx<T> originalCall;
AsyncObservable(Async2Rx<T> originalCall) {
this.originalCall = originalCall;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CallCallback<T> callback = new CallCallback(originalCall, observer);
observer.onSubscribe(callback);
originalCall.start(callback);
}
private static final class CallCallback<T> implements Disposable, Async2Rx.AsyncCallBack<T> {
private final Observer<? super T> observer;
private Async2Rx<T> originalCall;
boolean terminated = false;
private boolean mIsDisposed = false;
CallCallback(Async2Rx<T> originalCall, Observer<? super T> observer) {
this.originalCall = originalCall;
this.observer = observer;
}
@Override
public void dispose() {
mIsDisposed = true;
this.originalCall.cancel();
}
@Override
public boolean isDisposed() {
return mIsDisposed;
}
@Override
public void onSucc(T t) {
if (!isDisposed()) {
try {
this.observer.onNext(t);
if (!isDisposed()) {
this.terminated = true;
this.observer.onComplete();
}
} catch (Throwable var6) {
if (this.terminated) {
//onNext结束后onComplete()方法出错
RxJavaPlugins.onError(var6);
} else if (!isDisposed()) {
try {
this.observer.onError(var6);
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(new CompositeException(new Throwable[]{var6, var5}));
}
}
}
}
}
@Override
public void onError(Throwable throwable) {
if (!isDisposed()) {
try {
this.observer.onError(throwable);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
RxJavaPlugins.onError(new CompositeException(new Throwable[]{throwable, var4}));
}
}
}
}
public static Observable create(Async2Rx originalCall){
return create(originalCall, DEFAULT_TIME_OUT);
}
public static Observable create(Async2Rx originalCall,int timeout){
return new AsyncObservable(originalCall).timeout(timeout, TimeUnit.SECONDS).observeOn(Schedulers.newThread());
}
}
网友评论