Rxjava2实现Rxbus

作者: Lightofrain | 来源:发表于2016-11-09 13:08 被阅读2603次

Rxjava2在Api方面有不少变化,基于Rxjava1.x的Rxbus实现不太适用,网上关于2.0的资料很少,既然没有现成的,那就自己写一个

1.API变化
1.x中是使用PublishSubject来现实管理消息的,并使用SerializedSubject保证现成安全,在2.0中,
Subject被Processer代替,通过PublishProcesser管理消息,并通过toSerialized()方法保证现成安全

public class RxBus {
    //相当于Rxjava1.x中的Subject
    private final FlowableProcessor<Object> mBus;
    private static volatile RxBus sRxBus = null;

    private RxBus() {
        //调用toSerialized()方法,保证线程安全
        mBus = PublishProcessor.create().toSerialized();
    }

    public static synchronized RxBus getDefault() {
        if (sRxBus == null) {
            synchronized (RxBus.class) {
                if (sRxBus == null) {
                    sRxBus = new RxBus();
                }
            }
        }
        return sRxBus;
    }


    /**
     * 发送消息
     * @param o
     */
    public void post(Object o) {
        new SerializedSubscriber<>(mBus).onNext(o);
    }

    /**
     * 确定接收消息的类型
     * @param aClass
     * @param <T>
     * @return
     */
    public <T> Flowable<T> toFlowable(Class<T> aClass) {
        return mBus.ofType(aClass);
    }

    /**
     * 判断是否有订阅者
     * @return
     */
    public boolean hasSubscribers() {
        return mBus.hasSubscribers();
    }

}

2.简单封装

public class RxBusHelper {
    /**
     * 发布消息
     *
     * @param o
     */
    public static void post(Object o) {
        RxBus.getDefault().post(o);
    }

    /**
     * 接收消息,并在主线程处理
     *
     * @param aClass
     * @param disposables 用于存放消息
     * @param listener
     * @param <T>
     */
    public static <T> void doOnMainThread(Class<T> aClass, CompositeDisposable disposables, OnEventListener<T> listener) {
        disposables.add(RxBus.getDefault().toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS))));
    }

    public static <T> void doOnMainThread(Class<T> aClass, OnEventListener<T> listener) {
        RxBus.getDefault().toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS)));
    }

    /**
     * 接收消息,并在子线程处理
     *
     * @param aClass
     * @param disposables
     * @param listener
     * @param <T>
     */
    public static <T> void doOnChildThread(Class<T> aClass, CompositeDisposable disposables, OnEventListener<T> listener) {
        disposables.add(RxBus.getDefault().toFlowable(aClass).subscribeOn(Schedulers.newThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS))));
    }

    public static <T> void doOnChildThread(Class<T> aClass, OnEventListener<T> listener) {
        RxBus.getDefault().toFlowable(aClass).subscribeOn(Schedulers.newThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS)));
    }

    public interface OnEventListener<T> {
        void onEvent(T t);

        void onError(ErrorBean errorBean);
    }

}

3.示例
https://github.com/lightofrain/RxBusTest.git

相关文章

网友评论

  • SherlockXu8013:你好,请问下为什么要使用new SerializedSubscriber<>(mBus).onNext(o);而不直接使用mBus.onNext(o)?
  • 乘风破浪的程序员:为什么先订阅楼,再post 信息 才能收到 消息??
    LEVIN_TRUENO:跟EventBus还是有区别的,EventBus是有黏性事件可以先post 后订阅,但是RxBus这种方式应该是不支持黏性事件的
  • 乘风破浪的程序员:为什么收到的信息是乱码?
  • 适当车:mainactivity 收到消息乱码属于什么情况
    LEVIN_TRUENO:@Lightofrain 内存泄漏的话可以结合Rxlifecycle 控制解绑的
    Lightofrain:因在项目中使用RxBus,发现内存泄漏现象严重,思考了解决方式,最后发现和EventBus基本一致,所以,建议不要使用RxBus,还是EventBus靠谱

本文标题:Rxjava2实现Rxbus

本文链接:https://www.haomeiwen.com/subject/xegzuttx.html