Rxbus 升级为 RxJava 2 版本

作者: 壹尘子 | 来源:发表于2016-10-11 17:33 被阅读9414次

之前写了一个简单的Rxbus模块,使用的是RxJava 1.0版本,十月底Rxjava已经更新到了2.0.0,那我就对现有的Rxbus模块做一下升级。

之前Rxbus的连接:
http://www.jianshu.com/writer#/notebooks/3833653/notes/4406055

Step 1

引入最新的RxJava

compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'

Step 2

修改EventThread.class,删除Schedulers.immediate()相关
因为在2.0中删除了Schedulers.immediate()这个线程的切换

Step 3

修改RxBus.class

Branch 1 CompositeDisposable

old

//存放订阅者信息
 private Map<Object, CompositeSubscription> subscriptions = new HashMap<>();

new

//存放订阅者信息
private Map<Object, CompositeDisposable> subscriptions = new HashMap<>();

CompositeSubscription 修改为 CompositeDisposable
那么自然解除订阅的方式也有了修改
old

CompositeSubscription.unsubscribe();

new

CompositeDisposable.dispose();
Branch 2 Flowable

在2.0中增加了Flowable 这样就把 backpressure 的问题放到了Flowable中来处理,而Observable 不对backpressure进行处理了。

但是使用Flowable还是要注意对backpressure的处理,不然还是会出现以前的问题。

old

Observable.just(subscriber)
                .filter(s -> s != null)//判断订阅者不为空
                .filter(s -> subscriptions.get(subscriber)==null) //判断订阅者没有在序列中
                .map(s -> s.getClass())
                .flatMap(s -> Observable.from(s.getDeclaredMethods()))//获取订阅者方法并且用Observable装载
                .map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且关闭安全检查提升反射效率
                .filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必须被Subscribe注解
                .subscribe(m -> {
                    addSubscription(m,subscriber);
                });

new

Flowable.just(subscriber)
                .filter(s -> s != null)//判断订阅者不为空
                .filter(s -> subscriptions.get(subscriber)==null) //判断订阅者没有在序列中
                .map(s -> s.getClass())
                .flatMap(s -> Flowable.fromArray(s.getDeclaredMethods()))//获取订阅者方法并且用Observable装载
                .map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且关闭安全检查提升反射效率
                .filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必须被Subscribe注解
                .subscribe(m -> {
                    addSubscription(m,subscriber);
                });

并且要注意.from修改为了.fromArray

Branch 3 Disposable

old

Subscription subscription = tObservable(sub.tag(), cla)

new

Disposable disposable = tObservable(sub.tag(), cla)
Branch 4 元操作符修改

old

/**
     * 订阅事件
     * @return
     */
    public <T> Observable tObservable(int code, final Class<T> eventType) {
        return bus.ofType(Msg.class)//判断接收事件类型
                .filter(new Func1<Msg, Boolean>() {
                    @Override
                    public Boolean call(Msg o) {
                        //过滤code同的事件
                        return o.code == code;
                    }
                })
                .map(new Func1<Msg, Object>() {
                    @Override
                    public Object call(Msg o) {
                        return o.object;
                    }
                })
                .cast(eventType);
    }

new

/**
     * 订阅事件
     * @return
     */
    public <T> Observable tObservable(int code, final Class<T> eventType) {
        return bus.ofType(Msg.class)//判断接收事件类型
                .filter(new Predicate<Msg>() {
                    @Override
                    public boolean test(Msg msg) throws Exception {
                        return msg.code==code;
                    }
                })
                .map(new Function<Msg, Object>() {
                    @Override
                    public Object apply(Msg msg) throws Exception {
                        return msg.object;
                    }
                })
                .cast(eventType);
    }

filter、map在2.0中都有修改,这里只涉及到现在所使用的操作符,其他相关操作符修改请到查看相关 doc

Branch 5 subscribeWith

2.0中新增了subscribeWith()方法,对于这个方法我没有找到过多的解释,这里暂时引用两篇简书中的内容加以说明

subscribe后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回当前的Subscriber对象, 并且同时提供了DefaultSubscriber, ResourceSubscriber,DisposableSubscriber,让他们提供了Disposable接口, 可以完成和以前类似的代码 (引用1)

需要使用subscribeWith而不是subscribe,因为subscribe方法现在返回void (引用2)
因为篇幅有限为避免断章取义,如果对subscribeWith不解请去引用地址查看,引用地址在下方相关连接中。

old

/**
     * 解除订阅者
     * @param subscriber 订阅者
     */
    public void unRegister(Object subscriber) {
        Observable.just(subscriber)
                .filter(s -> s!=null)
                .map(s -> subscriptions.get(s))
                .filter(subs -> subs!=null)
                .subscribe(subs -> {
                    subs.unsubscribe();
                    subscriptions.remove(subscriber);

new

/**
     * 解除订阅者
     * @param subscriber 订阅者
     */
    public void unRegister(Object subscriber) {
        Flowable.just(subscriber)
                .filter(s -> s!=null)
                .map(s -> subscriptions.get(s))
                .filter(subs -> subs!=null)
                .subscribeWith(new Subscriber<CompositeDisposable>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                    }

                    @Override
                    public void onNext(CompositeDisposable compositeDisposable) {
                        compositeDisposable.dispose();
                        subscriptions.remove(subscriber);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

到这里Rxbus的升级就暂时完成了。

问题

在升级过程中也遇到了几个问题,现在暂时还没有解决,也在这里记录一下

1.

SerializedSubject 这个可以吧Subject序列化为线程安全的类没有找到,现在只有SerializedObserverSerializedSubscriber 这两个类,不知道以后是否会增加。

解决
SerializedSubject 已经变为非public类
可以通过bus = PublishSubject.create().toSerialized();的方式获取线程安全 的对象。

2.

在Rxbus解除订阅时我使用了RxJava的写法,如果只改动Observable为Flowable,那么程序会报错。但是在另一段与Retrofit相关的代码中却可以使用。

/**
     * 解除订阅者
     * @param subscriber 订阅者
     */
    public void unRegister(Object subscriber) {
        Observable.just(subscriber)
                .filter(s -> s!=null)
                .map(s -> subscriptions.get(s))
                .filter(subs -> subs!=null)
                .subscribe(subs -> {
                    subs.unsubscribe();
                    subscriptions.remove(subscriber);
APIServiceManager.getInstance()        
.getTravelNotesAPI()        
.getTravelNotesList(key, page + "")        
.compose(RxSchedulersHelper.io_main())        
.compose(SchedulersHelper.handleResult())        
.doOnTerminate(() -> view.disDialog())        
.subscribe(s -> RxBus.getInstance().post(RxBus.TAG_DEFAULT, s.getBookses()),                
e -> RxBus.getInstance().post(RxBus.TAG_ERROR, e.getMessage()));

public interface TravelNotesAPI {    
@GET(APIConfig.BASE_URL_TRAVEL_NOTES+"travellist?")  
Flowable<ResponseJson<TravelNoteBook>>  
getTravelNotesList(@Query("query") String query, @Query("page") String page);}

现在2.0出的时间不长所以如果文章中出现什么问题可以给我留言

相关连接

RxJava 2 :
https://github.com/ReactiveX/RxJava/tree/2.x
RxJava 2 doc:
http://reactivex.io/RxJava/2.x/javadoc/
简书RxJava2说明:
http://www.jianshu.com/p/763322683f23 (引用1)
http://www.jianshu.com/p/850af4f09b61 (引用2)
backpressure相关说明:
http://www.dundunwen.com/article/275b1d92-f9da-4bb8-b111-3aa8a6ace245.html
RxBus:
https://github.com/hackerlc/GearApplication/tree/master/gearlibrary/src/main/java/gear/yc/com/gearlibrary/rxjava/rxbus

相关文章

网友评论

  • NullRoutine:博主你好,我用rxbus,在activity里面定义了统一的CompositeDisposable去取消订阅,rxbus发送的不能取消,会发生内存泄漏,然后把rxbus订阅放在 private HashMap<String, CompositeDisposable> mSubscriptionMap;,再解除订阅就不会泄漏,不太懂原因
    NullRoutine:@壹尘子 好了,是我忘记取消订阅了,尴尬。。。
    壹尘子:@请叫我搁浅 可以在debug中看一下Compisite是否真的取消成功了
  • _10_01_:问题二可以使用这种方式取消订阅:
    if (!disposable.isDisposed) disposable.dispose()
    改为Flowable和Single都可以取消订阅
    http://www.jianshu.com/p/afa9d9a86466

    CompositeDisposable mSubscriptions = new CompositeDisposable();
    mSubscriptions.clear();
    这种方式应该也能取消订阅
  • _10_01_: private final PublishSubject<Object> bus = PublishSubject.create();

    final Subject<Object> subject = bus.toSerialized();

    不知这样是否能解决问题一呢?
    壹尘子:@谷哥的小弟丶 感谢,已经调整了bus = PublishSubject.create().toSerialized();
    toSerialized() 方法中其实就是之前序列化安全对象的写法。而SerializedSubject类已经变成了非public的。
    _10_01_:@谷哥的小弟丶 参考:https://github.com/akarnokd/rxjava2-backport/blob/master/src/test/java/hu/akarnokd/rxjava2/subjects/SerializedSubjectTest.java

本文标题:Rxbus 升级为 RxJava 2 版本

本文链接:https://www.haomeiwen.com/subject/grrbyttx.html