美文网首页
Rxbus 的封装及其使用,支持背压

Rxbus 的封装及其使用,支持背压

作者: 懒神_ | 来源:发表于2019-06-22 15:14 被阅读0次

    Rxbus的使用:

    1、发送消息:

    RxBus.getDefault().post(type, object);
    

    2、接收消息:

    Disposable  disposable = RxBus.getDefault().register(type, object.class)
    .subscribe(o -> {
          //todo
    });
    

    3、注销消息:

    RxBus.getDefault().unregister(disposable);
    

    4、封装Rxbus

    private static volatile RxBus  mInstance;
    
    private final FlowableProcessorbus = PublishProcessor.create().toSerialized();
    
    public static RxBus getDefault() {
            if (mInstance ==null) {
                  synchronized (RxBus.class) {
                     if (mInstance ==null) {
                          mInstance =new RxBus();
                        }
                 }
          }
           return mInstance;
     }
    
       /**
       * @param tag    tag必须保持一致
       * @param object 发送的数据类型和接受的数据类型必须要一致,要不然接受不到消息
       */
    public void post(@NonNull String tag, @NonNull Object object) {
        RxBusEntity entity =new RxBusEntity();
        entity.setTag(tag);
        entity.setObject(object);
        bus.onNext(entity);
    }
    /**
    * 注册 rxbus
    *
    * @param tag
    * @param tClass
    * @param <T>
    * @return
    */
    public Flowableregister(@NonNull final String tag, @NonNull final Class tClass) {
          return register(tag, tClass, AndroidSchedulers.mainThread());
    }
    public Flowableregister(@NonNull final String tag, final Class tClass, Scheduler scheduler) {
         return bus.filter(o -> {
                if (oinstanceof RxBusEntity) {
                     RxBusEntity entity = (RxBusEntity) o;
                     if (TextUtils.isEmpty(entity.getTag())) {
                            return false;  
                     }
                    if (entity.getTag().equals(tag)) {
                           return tClass.isInstance(entity.getObject());
                     }
              }
               return false;
        }).map(o -> ((RxBusEntity) o).getObject()).cast(tClass).observeOn(scheduler);
    }
    
    public void unregister(Disposable... disposables) {
          if (disposables ==null){
                return;
            }
          for (Disposable disposable : disposables) {
               if (disposable ==null || disposable.isDisposed()) {
                     continue;
                }
              disposable.dispose();
        }
    }
    

    5、RxBusEntity:

    private StringmTag;
    private ObjectmObject;
    public StringgetTag() {
      return mTag;
    }
    public void setTag(String tag) {
        this.mTag = tag;
    }
    public ObjectgetObject() {
        return mObject;
    }
    public void setObject(Object object) {
        this.mObject = object;
    }
    

    6、以上就是支持背压的rxbus封装,及其好用,没有太多的花里花哨,消息稳定、灵活使用、线程切换,无所不能。

    相关文章

      网友评论

          本文标题:Rxbus 的封装及其使用,支持背压

          本文链接:https://www.haomeiwen.com/subject/naxvqctx.html