美文网首页
使用RxJava构建一个常用的useCase

使用RxJava构建一个常用的useCase

作者: Jamesbond_5521 | 来源:发表于2022-01-24 14:39 被阅读0次

    使用RxJava构建一个常用的useCase,功能主要有2种
    1、订阅rxjava对象
    2、订阅一次rxjava对象
    FlowableUseCase代码如下

    package com.example.commonui.utils;
    
    import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
    import io.reactivex.rxjava3.core.BackpressureStrategy;
    import io.reactivex.rxjava3.core.Flowable;
    import io.reactivex.rxjava3.disposables.Disposable;
    import io.reactivex.rxjava3.functions.Action;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Supplier;
    import io.reactivex.rxjava3.schedulers.Schedulers;
    import io.reactivex.rxjava3.subscribers.DisposableSubscriber;
    
    public  class FlowableUseCase {
    
        public static  <T> Disposable listener(Flowable flowable, Action initiator, Consumer<T> callback) {
            if(initiator == null){
                return listener(flowable,callback);
            }
            return flowable.ambWith(Flowable.create(emitter -> initiator.run(), BackpressureStrategy.BUFFER)).map(o->(T)o)
                    .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(callback);
        }
    
        public static  <T> Disposable listener(Flowable flowable, Consumer<T> callback) {
            return flowable.map(o->(T)o)
                    .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(callback);
        }
    
        public static  <T> Disposable single(Flowable flowable, Action initiator, Consumer<T> callback) {
            SingleSubscriber subscriber = new SingleSubscriber() {
                @Override
                public void callback(Object o) {
                    try {
                        if(callback!=null){
                            callback.accept((T) o);
                        }
                    } catch (Throwable throwable) {
                        throwable.printStackTrace();
                    }
                }
            };
            flowable.ambWith(Flowable.create(emitter -> initiator.run(), BackpressureStrategy.BUFFER))
                    .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
            return subscriber;
        }
    
        public static  <T> Disposable single(Supplier<T> initiator, Consumer<T> callback) {
    
            SingleSubscriber subscriber = new SingleSubscriber() {
                @Override
                public void callback(Object o) {
                    try {
                        if(callback!=null){
                            callback.accept((T) o);
                        }
                    } catch (Throwable throwable) {
                        throwable.printStackTrace();
                    }
                }
            };
            Flowable.create(emitter -> emitter.onNext(initiator.get()), BackpressureStrategy.BUFFER)
                    .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
            return subscriber;
        }
        private static abstract class SingleSubscriber<T> extends DisposableSubscriber<T>{
    
            @Override
            protected void onStart() {
                request(1);
            }
    
            @Override
            public void onNext(T t) {
                callback(t);
            }
    
            public abstract void callback(T t);
    
            @Override
            public void onError(Throwable t) {
    
            }
    
            @Override
            public void onComplete() {
    
            }
        }
    }
    
    

    FlowableUserCaseCaller代码如下

    package com.example.commonui.utils;
    
    import android.util.ArrayMap;
    
    import androidx.lifecycle.Lifecycle;
    import androidx.lifecycle.LifecycleEventObserver;
    import androidx.lifecycle.LifecycleOwner;
    
    import javax.inject.Inject;
    
    import io.reactivex.rxjava3.core.Flowable;
    import io.reactivex.rxjava3.disposables.CompositeDisposable;
    import io.reactivex.rxjava3.functions.Action;
    import io.reactivex.rxjava3.functions.Consumer;
    import io.reactivex.rxjava3.functions.Supplier;
    
    public class FlowableUserCaseCaller {
        Flowable source;
        Action action;
        Supplier supplier;
        Consumer consumer;
        int flag;
        ArrayMap<Integer, Action> calls = new ArrayMap<>();
        CompositeDisposable disposable = new CompositeDisposable();
    
        @Inject
        public FlowableUserCaseCaller() {
            calls.put(1, () -> disposable.add(FlowableUseCase.listener(source, action, consumer)));
            calls.put(2, () -> disposable.add(FlowableUseCase.single(source, action, consumer)));
            calls.put(3, () -> disposable.add(FlowableUseCase.single(supplier, consumer)));
        }
        public FlowableUserCaseCaller listener(Flowable source) {
            flag = 1;
            this.source = source;
            return this;
        }
        public FlowableUserCaseCaller singleListener(Flowable source) {
            this.flag = 2;
            this.source = source;
            return this;
        }
        public <T> FlowableUserCaseCaller load(Supplier<T> supplier) {
            this.flag = 3;
            this.supplier = supplier;
            return this;
        }
        public static FlowableUserCaseCaller bind(LifecycleOwner owner){
            FlowableUserCaseCaller caseCaller = new FlowableUserCaseCaller();
            owner.getLifecycle().addObserver((LifecycleEventObserver) (source, event) -> {
                if (event == Lifecycle.Event.ON_DESTROY) {
                    caseCaller.dispose();
                }
            });
            return caseCaller;
        }
    
        public FlowableUserCaseCaller load(Action action) {
            this.action = action;
            return this;
        }
        public <T> FlowableUserCaseCaller callback(Consumer<T> consumer) {
            this.consumer = consumer;
            return this;
        }
        public void start() {
            try {
                if (consumer != null) {
                    calls.get(flag).run();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        }
    
        public void dispose() {
            if (!disposable.isDisposed()) {
                disposable.dispose();
            }
        }
    }
    
    

    具体使用如下
    1、listener

    FlowableUserCaseCaller useCase = new FlowableUserCaseCaller ();
    useCase.listener(userManager.name).callback(o -> {
                //通知界面更新
                name.setValue((String) o);
            }).start();
    

    2、singleListener

    useCase.singleListener(userManager.singleTest)
                    .load(userManager::test)
                    .callback(o -> {
                        name.setValue(o.toString());
                    }).start();
    

    3、单次执行方法无监听

    useCase.load(userManager::getUser).callback((Consumer<Result<List<User>>>) o -> {
                if (o.isValid()) {
                    datas.setValue(o.getData());
                } else {
                    Log.i("xiaochangyan", "code:" + o.getCode() + " message:" + o.getMessage());
                }
                isRefreshing.notifyChange();
            }).start();
    

    4、释放资源

    useCase.dispose()
    

    相关文章

      网友评论

          本文标题:使用RxJava构建一个常用的useCase

          本文链接:https://www.haomeiwen.com/subject/vqubhrtx.html