美文网首页
异步工作转为RX的Observable

异步工作转为RX的Observable

作者: 蒸汽飞船 | 来源:发表于2018-03-18 16:18 被阅读105次

    如何将一个一步操作的方法转为RX的Observable

    1.定义接口Async2Rx

    public interface Async2Rx<T>{
        void start(AsyncCallBack<T> callback);
        void cancel();
    
        interface AsyncCallBack<T> {
            void onSucc(T t);
            void onError(Throwable throwable);
        }
    }
    

    2.该类实现Async2Rx接口:

    public static class Test implements Async2Rx<String>{ 
    
        @Override
        public void start(final AsyncCallBack<String> callback) {
            //异步的操作,比如买东西
            buySomeThing(new OnPayListener(){
                @Override
                public void onPaySuccess(String success) {
                    callback.onSucc(success);
                }
    
                @Override
                public void onPayFailed(Throwable throwable) {
                    callback.onError(throwable);
                }
            });
        }
    
        @Override
        public void cancel() {
            //收到取消通知时,取消异步任务。比如用户调用了:Disposable.dispose()方法
            cancelBuy();
        }
    }
    

    3.生成Observable:

    Test mTest=xxx;
    Observable observable = AsyncObservable.create(mTest);
    

    该方法,可以设置超时时间单位为秒,不设置的话默认为40秒

    核心类:

    public class AsyncObservable<T> extends Observable<T> {
        private static final int DEFAULT_TIME_OUT = 40;  //超时时间
        private final Async2Rx<T> originalCall;
    
        AsyncObservable(Async2Rx<T> originalCall) {
            this.originalCall = originalCall;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CallCallback<T> callback = new CallCallback(originalCall, observer);
            observer.onSubscribe(callback);
            originalCall.start(callback);
        }
    
        private static final class CallCallback<T> implements Disposable, Async2Rx.AsyncCallBack<T> {
            private final Observer<? super T> observer;
            private Async2Rx<T> originalCall;
            boolean terminated = false;
            private boolean mIsDisposed = false;
    
            CallCallback(Async2Rx<T> originalCall, Observer<? super T> observer) {
                this.originalCall = originalCall;
                this.observer = observer;
            }
    
            @Override
            public void dispose() {
                mIsDisposed = true;
                this.originalCall.cancel();
            }
    
            @Override
            public boolean isDisposed() {
                return mIsDisposed;
            }
    
            @Override
            public void onSucc(T t) {
                if (!isDisposed()) {
                    try {
                        this.observer.onNext(t);
                        if (!isDisposed()) {
                            this.terminated = true;
                            this.observer.onComplete();
                        }
                    } catch (Throwable var6) {
                        if (this.terminated) {
                            //onNext结束后onComplete()方法出错
                            RxJavaPlugins.onError(var6);
                        } else if (!isDisposed()) {
                            try {
                                this.observer.onError(var6);
                            } catch (Throwable var5) {
                                Exceptions.throwIfFatal(var5);
                                RxJavaPlugins.onError(new CompositeException(new Throwable[]{var6, var5}));
                            }
                        }
                    }
    
                }
            }
    
            @Override
            public void onError(Throwable throwable) {
                if (!isDisposed()) {
                    try {
                        this.observer.onError(throwable);
                    } catch (Throwable var4) {
                        Exceptions.throwIfFatal(var4);
                        RxJavaPlugins.onError(new CompositeException(new Throwable[]{throwable, var4}));
                    }
    
                }
            }
        }
    
        public static Observable create(Async2Rx originalCall){
           return create(originalCall,  DEFAULT_TIME_OUT);
        }
        public static Observable create(Async2Rx originalCall,int timeout){
            return new AsyncObservable(originalCall).timeout(timeout, TimeUnit.SECONDS).observeOn(Schedulers.newThread());
        }
    }
    

    相关文章

      网友评论

          本文标题:异步工作转为RX的Observable

          本文链接:https://www.haomeiwen.com/subject/udgeqftx.html