Rxbus 升级为 RxJava 2 版本

作者: 壹尘子 | 来源:发表于2016-10-11 17:33 被阅读9414次

    之前写了一个简单的Rxbus模块,使用的是RxJava 1.0版本,十月底Rxjava已经更新到了2.0.0,那我就对现有的Rxbus模块做一下升级。

    之前Rxbus的连接:
    http://www.jianshu.com/writer#/notebooks/3833653/notes/4406055

    Step 1

    引入最新的RxJava

    compile 'io.reactivex.rxjava2:rxjava:2.0.0'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'
    

    Step 2

    修改EventThread.class,删除Schedulers.immediate()相关
    因为在2.0中删除了Schedulers.immediate()这个线程的切换

    Step 3

    修改RxBus.class

    Branch 1 CompositeDisposable

    old

    //存放订阅者信息
     private Map<Object, CompositeSubscription> subscriptions = new HashMap<>();
    

    new

    //存放订阅者信息
    private Map<Object, CompositeDisposable> subscriptions = new HashMap<>();
    

    CompositeSubscription 修改为 CompositeDisposable
    那么自然解除订阅的方式也有了修改
    old

    CompositeSubscription.unsubscribe();
    

    new

    CompositeDisposable.dispose();
    
    Branch 2 Flowable

    在2.0中增加了Flowable 这样就把 backpressure 的问题放到了Flowable中来处理,而Observable 不对backpressure进行处理了。

    但是使用Flowable还是要注意对backpressure的处理,不然还是会出现以前的问题。

    old

    Observable.just(subscriber)
                    .filter(s -> s != null)//判断订阅者不为空
                    .filter(s -> subscriptions.get(subscriber)==null) //判断订阅者没有在序列中
                    .map(s -> s.getClass())
                    .flatMap(s -> Observable.from(s.getDeclaredMethods()))//获取订阅者方法并且用Observable装载
                    .map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且关闭安全检查提升反射效率
                    .filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必须被Subscribe注解
                    .subscribe(m -> {
                        addSubscription(m,subscriber);
                    });
    

    new

    Flowable.just(subscriber)
                    .filter(s -> s != null)//判断订阅者不为空
                    .filter(s -> subscriptions.get(subscriber)==null) //判断订阅者没有在序列中
                    .map(s -> s.getClass())
                    .flatMap(s -> Flowable.fromArray(s.getDeclaredMethods()))//获取订阅者方法并且用Observable装载
                    .map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且关闭安全检查提升反射效率
                    .filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必须被Subscribe注解
                    .subscribe(m -> {
                        addSubscription(m,subscriber);
                    });
    

    并且要注意.from修改为了.fromArray

    Branch 3 Disposable

    old

    Subscription subscription = tObservable(sub.tag(), cla)
    

    new

    Disposable disposable = tObservable(sub.tag(), cla)
    
    Branch 4 元操作符修改

    old

    /**
         * 订阅事件
         * @return
         */
        public <T> Observable tObservable(int code, final Class<T> eventType) {
            return bus.ofType(Msg.class)//判断接收事件类型
                    .filter(new Func1<Msg, Boolean>() {
                        @Override
                        public Boolean call(Msg o) {
                            //过滤code同的事件
                            return o.code == code;
                        }
                    })
                    .map(new Func1<Msg, Object>() {
                        @Override
                        public Object call(Msg o) {
                            return o.object;
                        }
                    })
                    .cast(eventType);
        }
    

    new

    /**
         * 订阅事件
         * @return
         */
        public <T> Observable tObservable(int code, final Class<T> eventType) {
            return bus.ofType(Msg.class)//判断接收事件类型
                    .filter(new Predicate<Msg>() {
                        @Override
                        public boolean test(Msg msg) throws Exception {
                            return msg.code==code;
                        }
                    })
                    .map(new Function<Msg, Object>() {
                        @Override
                        public Object apply(Msg msg) throws Exception {
                            return msg.object;
                        }
                    })
                    .cast(eventType);
        }
    

    filter、map在2.0中都有修改,这里只涉及到现在所使用的操作符,其他相关操作符修改请到查看相关 doc

    Branch 5 subscribeWith

    2.0中新增了subscribeWith()方法,对于这个方法我没有找到过多的解释,这里暂时引用两篇简书中的内容加以说明

    subscribe后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回当前的Subscriber对象, 并且同时提供了DefaultSubscriber, ResourceSubscriber,DisposableSubscriber,让他们提供了Disposable接口, 可以完成和以前类似的代码 (引用1)

    需要使用subscribeWith而不是subscribe,因为subscribe方法现在返回void (引用2)
    因为篇幅有限为避免断章取义,如果对subscribeWith不解请去引用地址查看,引用地址在下方相关连接中。

    old

    /**
         * 解除订阅者
         * @param subscriber 订阅者
         */
        public void unRegister(Object subscriber) {
            Observable.just(subscriber)
                    .filter(s -> s!=null)
                    .map(s -> subscriptions.get(s))
                    .filter(subs -> subs!=null)
                    .subscribe(subs -> {
                        subs.unsubscribe();
                        subscriptions.remove(subscriber);
    

    new

    /**
         * 解除订阅者
         * @param subscriber 订阅者
         */
        public void unRegister(Object subscriber) {
            Flowable.just(subscriber)
                    .filter(s -> s!=null)
                    .map(s -> subscriptions.get(s))
                    .filter(subs -> subs!=null)
                    .subscribeWith(new Subscriber<CompositeDisposable>() {
                        @Override
                        public void onSubscribe(Subscription s) {
    
                        }
    
                        @Override
                        public void onNext(CompositeDisposable compositeDisposable) {
                            compositeDisposable.dispose();
                            subscriptions.remove(subscriber);
                        }
    
                        @Override
                        public void onError(Throwable t) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }
    

    到这里Rxbus的升级就暂时完成了。

    问题

    在升级过程中也遇到了几个问题,现在暂时还没有解决,也在这里记录一下

    1.

    SerializedSubject 这个可以吧Subject序列化为线程安全的类没有找到,现在只有SerializedObserverSerializedSubscriber 这两个类,不知道以后是否会增加。

    解决
    SerializedSubject 已经变为非public类
    可以通过bus = PublishSubject.create().toSerialized();的方式获取线程安全 的对象。

    2.

    在Rxbus解除订阅时我使用了RxJava的写法,如果只改动Observable为Flowable,那么程序会报错。但是在另一段与Retrofit相关的代码中却可以使用。

    /**
         * 解除订阅者
         * @param subscriber 订阅者
         */
        public void unRegister(Object subscriber) {
            Observable.just(subscriber)
                    .filter(s -> s!=null)
                    .map(s -> subscriptions.get(s))
                    .filter(subs -> subs!=null)
                    .subscribe(subs -> {
                        subs.unsubscribe();
                        subscriptions.remove(subscriber);
    
    APIServiceManager.getInstance()        
    .getTravelNotesAPI()        
    .getTravelNotesList(key, page + "")        
    .compose(RxSchedulersHelper.io_main())        
    .compose(SchedulersHelper.handleResult())        
    .doOnTerminate(() -> view.disDialog())        
    .subscribe(s -> RxBus.getInstance().post(RxBus.TAG_DEFAULT, s.getBookses()),                
    e -> RxBus.getInstance().post(RxBus.TAG_ERROR, e.getMessage()));
    
    public interface TravelNotesAPI {    
    @GET(APIConfig.BASE_URL_TRAVEL_NOTES+"travellist?")  
    Flowable<ResponseJson<TravelNoteBook>>  
    getTravelNotesList(@Query("query") String query, @Query("page") String page);}
    

    现在2.0出的时间不长所以如果文章中出现什么问题可以给我留言

    相关连接

    RxJava 2 :
    https://github.com/ReactiveX/RxJava/tree/2.x
    RxJava 2 doc:
    http://reactivex.io/RxJava/2.x/javadoc/
    简书RxJava2说明:
    http://www.jianshu.com/p/763322683f23 (引用1)
    http://www.jianshu.com/p/850af4f09b61 (引用2)
    backpressure相关说明:
    http://www.dundunwen.com/article/275b1d92-f9da-4bb8-b111-3aa8a6ace245.html
    RxBus:
    https://github.com/hackerlc/GearApplication/tree/master/gearlibrary/src/main/java/gear/yc/com/gearlibrary/rxjava/rxbus

    相关文章

      网友评论

      • NullRoutine:博主你好,我用rxbus,在activity里面定义了统一的CompositeDisposable去取消订阅,rxbus发送的不能取消,会发生内存泄漏,然后把rxbus订阅放在 private HashMap<String, CompositeDisposable> mSubscriptionMap;,再解除订阅就不会泄漏,不太懂原因
        NullRoutine:@壹尘子 好了,是我忘记取消订阅了,尴尬。。。
        壹尘子:@请叫我搁浅 可以在debug中看一下Compisite是否真的取消成功了
      • _10_01_:问题二可以使用这种方式取消订阅:
        if (!disposable.isDisposed) disposable.dispose()
        改为Flowable和Single都可以取消订阅
        http://www.jianshu.com/p/afa9d9a86466

        CompositeDisposable mSubscriptions = new CompositeDisposable();
        mSubscriptions.clear();
        这种方式应该也能取消订阅
      • _10_01_: private final PublishSubject<Object> bus = PublishSubject.create();

        final Subject<Object> subject = bus.toSerialized();

        不知这样是否能解决问题一呢?
        壹尘子:@谷哥的小弟丶 感谢,已经调整了bus = PublishSubject.create().toSerialized();
        toSerialized() 方法中其实就是之前序列化安全对象的写法。而SerializedSubject类已经变成了非public的。
        _10_01_:@谷哥的小弟丶 参考:https://github.com/akarnokd/rxjava2-backport/blob/master/src/test/java/hu/akarnokd/rxjava2/subjects/SerializedSubjectTest.java

      本文标题:Rxbus 升级为 RxJava 2 版本

      本文链接:https://www.haomeiwen.com/subject/grrbyttx.html