美文网首页
RxBus2.0 Rxjava2.0 使用

RxBus2.0 Rxjava2.0 使用

作者: 往之_ | 来源:发表于2017-09-21 16:32 被阅读175次

    最近项目中把rxjava 切换到2.0 所以相对应的一些都要做出改变 新版本的 独立出来一个Flowable 来处理背压事件.
    下面就是修改过的 适用于Rxjava2.0 版本的 rxbus

    /**
     * Created by storm on 2017/9/20.
     * <p>
     * Rxbus
     */
    
    public class RxBus {
    
        private static final String TAG = RxBus.class.getSimpleName();
    
    
        private static volatile RxBus mInstance;
    
        /**
         * 默认 bus ;
         */
    
        private Subject<Object> _mBus;
    
        /**
         * 背压
         */
        private FlowableProcessor<Object> _mBackPressureBus;
    
    
        private Map<Object, CompositeDisposable> mSubscription;
    
    
        private RxBus() {
    
            _mBus = PublishSubject.create().toSerialized();
    
            _mBackPressureBus = PublishProcessor.create().toSerialized();
        }
    
    
        public static RxBus getInstance() {
    
            if (mInstance == null) {
    
                synchronized (RxBus.class) {
                    if (mInstance == null) {
    
                        mInstance = new RxBus();
                    }
                }
            }
    
            return mInstance;
        }
    
    
        /**
         * 发送普通事件
         */
        public void send(Object event) {
    
            _mBus.onNext(event);
    
        }
    
    
        /**
         * 发送背压事件
         */
        public void sendByBackPressure(Object event) {
            _mBackPressureBus.onNext(event);
    
        }
    
    
        /**
         * 接收普通事件
         */
        public <T> Observable<T> toObservable(Class<T> eventType) {
    
            return _mBus.ofType(eventType);
        }
    
    
        /**
         * 接受背压事件
         */
        public <T> Flowable<T> toFlowable(Class<T> eventType) {
    
            return _mBackPressureBus.ofType(eventType);
        }
    
    
        /**
         * 普通事件的处理
         */
        public <T> Disposable doSubscribe(Class<T> eventType, Consumer<T> next, Consumer<Throwable> error) {
    
            return toObservable(eventType)
                    .compose(RxHelper.<T>IO_Main())
                    .subscribe(next, error);
        }
    
    
        /**
         * 背压事件处理
         */
        public <T> Flowable doFlowable(Class<T> eventType, Subscriber<T> tSubscriber) {
    
            toFlowable(eventType)
                    .onBackpressureLatest() //背压策略
                    .compose(RxHelper.<T>IO_Main_Flowable())
                    .subscribeWith(tSubscriber);
    
            return toFlowable(eventType);
        }
    
    
        /**
         * 是否有订阅者
         */
        public  boolean hasSubscribers(boolean isBackPressure) {
    
            if (!isBackPressure)
                return _mBus.hasObservers();
            else
                return _mBackPressureBus.hasSubscribers();
        }
    
    
        /**
         * 背压解除订阅
         */
        public void unSubscription(){
    
            _mBackPressureBus.onComplete();
    
        }
    
    
        /**
         * 添加订阅到集合(一般事件)
         */
        public void addSubscriptions(Object o, Disposable disposable) {
    
            if (mSubscription == null) {
                mSubscription = new HashMap<>();
            }
    
            String key = o.getClass().getName();
    
            if (mSubscription.get(key) != null) {
                mSubscription.get(key).add(disposable);
    
            } else {
                CompositeDisposable compositeDisposable = new CompositeDisposable();
    
                compositeDisposable.add(disposable);
                mSubscription.put(key, compositeDisposable);
            }
    
    
        }
    
    
        /**
         * 解除订阅
         * 一般事件的解除订阅
         *
         * @param o
         */
        public void clearSubscriptions(Object o) {
            if (mSubscription == null) {
                return;
            }
    
    
            String key = o.getClass().getName();
    
            if (!mSubscription.containsKey(key)) {
                return;
            }
    
    
            if (mSubscription.get(key) != null) {
                mSubscription.get(key).dispose();
    
            }
    
            mSubscription.remove(key);
        }
    
    }
    

    相关文章

      网友评论

          本文标题:RxBus2.0 Rxjava2.0 使用

          本文链接:https://www.haomeiwen.com/subject/ubqtextx.html