之前写了一个简单的Rxbus模块,使用的是RxJava 1.0版本,十月底Rxjava已经更新到了2.0.0,那我就对现有的Rxbus模块做一下升级。
之前Rxbus的连接:
http://www.jianshu.com/writer#/notebooks/3833653/notes/4406055
Step 1
引入最新的RxJava
compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'
Step 2
修改EventThread.class,删除Schedulers.immediate()相关
因为在2.0中删除了Schedulers.immediate()这个线程的切换
Step 3
修改RxBus.class
Branch 1 CompositeDisposable
old
//存放订阅者信息
private Map<Object, CompositeSubscription> subscriptions = new HashMap<>();
new
//存放订阅者信息
private Map<Object, CompositeDisposable> subscriptions = new HashMap<>();
CompositeSubscription 修改为 CompositeDisposable
那么自然解除订阅的方式也有了修改
old
CompositeSubscription.unsubscribe();
new
CompositeDisposable.dispose();
Branch 2 Flowable
在2.0中增加了Flowable 这样就把 backpressure 的问题放到了Flowable中来处理,而Observable 不对backpressure进行处理了。
但是使用Flowable还是要注意对backpressure的处理,不然还是会出现以前的问题。
old
Observable.just(subscriber)
.filter(s -> s != null)//判断订阅者不为空
.filter(s -> subscriptions.get(subscriber)==null) //判断订阅者没有在序列中
.map(s -> s.getClass())
.flatMap(s -> Observable.from(s.getDeclaredMethods()))//获取订阅者方法并且用Observable装载
.map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且关闭安全检查提升反射效率
.filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必须被Subscribe注解
.subscribe(m -> {
addSubscription(m,subscriber);
});
new
Flowable.just(subscriber)
.filter(s -> s != null)//判断订阅者不为空
.filter(s -> subscriptions.get(subscriber)==null) //判断订阅者没有在序列中
.map(s -> s.getClass())
.flatMap(s -> Flowable.fromArray(s.getDeclaredMethods()))//获取订阅者方法并且用Observable装载
.map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且关闭安全检查提升反射效率
.filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必须被Subscribe注解
.subscribe(m -> {
addSubscription(m,subscriber);
});
并且要注意.from修改为了.fromArray
Branch 3 Disposable
old
Subscription subscription = tObservable(sub.tag(), cla)
new
Disposable disposable = tObservable(sub.tag(), cla)
Branch 4 元操作符修改
old
/**
* 订阅事件
* @return
*/
public <T> Observable tObservable(int code, final Class<T> eventType) {
return bus.ofType(Msg.class)//判断接收事件类型
.filter(new Func1<Msg, Boolean>() {
@Override
public Boolean call(Msg o) {
//过滤code同的事件
return o.code == code;
}
})
.map(new Func1<Msg, Object>() {
@Override
public Object call(Msg o) {
return o.object;
}
})
.cast(eventType);
}
new
/**
* 订阅事件
* @return
*/
public <T> Observable tObservable(int code, final Class<T> eventType) {
return bus.ofType(Msg.class)//判断接收事件类型
.filter(new Predicate<Msg>() {
@Override
public boolean test(Msg msg) throws Exception {
return msg.code==code;
}
})
.map(new Function<Msg, Object>() {
@Override
public Object apply(Msg msg) throws Exception {
return msg.object;
}
})
.cast(eventType);
}
filter、map在2.0中都有修改,这里只涉及到现在所使用的操作符,其他相关操作符修改请到查看相关 doc
Branch 5 subscribeWith
2.0中新增了subscribeWith()方法,对于这个方法我没有找到过多的解释,这里暂时引用两篇简书中的内容加以说明
subscribe后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回当前的Subscriber对象, 并且同时提供了DefaultSubscriber, ResourceSubscriber,DisposableSubscriber,让他们提供了Disposable接口, 可以完成和以前类似的代码 (引用1)
需要使用subscribeWith而不是subscribe,因为subscribe方法现在返回void (引用2)
因为篇幅有限为避免断章取义,如果对subscribeWith不解请去引用地址查看,引用地址在下方相关连接中。
old
/**
* 解除订阅者
* @param subscriber 订阅者
*/
public void unRegister(Object subscriber) {
Observable.just(subscriber)
.filter(s -> s!=null)
.map(s -> subscriptions.get(s))
.filter(subs -> subs!=null)
.subscribe(subs -> {
subs.unsubscribe();
subscriptions.remove(subscriber);
new
/**
* 解除订阅者
* @param subscriber 订阅者
*/
public void unRegister(Object subscriber) {
Flowable.just(subscriber)
.filter(s -> s!=null)
.map(s -> subscriptions.get(s))
.filter(subs -> subs!=null)
.subscribeWith(new Subscriber<CompositeDisposable>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(CompositeDisposable compositeDisposable) {
compositeDisposable.dispose();
subscriptions.remove(subscriber);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
到这里Rxbus的升级就暂时完成了。
问题
在升级过程中也遇到了几个问题,现在暂时还没有解决,也在这里记录一下
1.
SerializedSubject 这个可以吧Subject序列化为线程安全的类没有找到,现在只有SerializedObserver、SerializedSubscriber 这两个类,不知道以后是否会增加。
解决
SerializedSubject 已经变为非public类
可以通过bus = PublishSubject.create().toSerialized();的方式获取线程安全 的对象。
2.
在Rxbus解除订阅时我使用了RxJava的写法,如果只改动Observable为Flowable,那么程序会报错。但是在另一段与Retrofit相关的代码中却可以使用。
/**
* 解除订阅者
* @param subscriber 订阅者
*/
public void unRegister(Object subscriber) {
Observable.just(subscriber)
.filter(s -> s!=null)
.map(s -> subscriptions.get(s))
.filter(subs -> subs!=null)
.subscribe(subs -> {
subs.unsubscribe();
subscriptions.remove(subscriber);
APIServiceManager.getInstance()
.getTravelNotesAPI()
.getTravelNotesList(key, page + "")
.compose(RxSchedulersHelper.io_main())
.compose(SchedulersHelper.handleResult())
.doOnTerminate(() -> view.disDialog())
.subscribe(s -> RxBus.getInstance().post(RxBus.TAG_DEFAULT, s.getBookses()),
e -> RxBus.getInstance().post(RxBus.TAG_ERROR, e.getMessage()));
public interface TravelNotesAPI {
@GET(APIConfig.BASE_URL_TRAVEL_NOTES+"travellist?")
Flowable<ResponseJson<TravelNoteBook>>
getTravelNotesList(@Query("query") String query, @Query("page") String page);}
现在2.0出的时间不长所以如果文章中出现什么问题可以给我留言
相关连接
RxJava 2 :
https://github.com/ReactiveX/RxJava/tree/2.x
RxJava 2 doc:
http://reactivex.io/RxJava/2.x/javadoc/
简书RxJava2说明:
http://www.jianshu.com/p/763322683f23 (引用1)
http://www.jianshu.com/p/850af4f09b61 (引用2)
backpressure相关说明:
http://www.dundunwen.com/article/275b1d92-f9da-4bb8-b111-3aa8a6ace245.html
RxBus:
https://github.com/hackerlc/GearApplication/tree/master/gearlibrary/src/main/java/gear/yc/com/gearlibrary/rxjava/rxbus
网友评论
if (!disposable.isDisposed) disposable.dispose()
改为Flowable和Single都可以取消订阅
http://www.jianshu.com/p/afa9d9a86466
CompositeDisposable mSubscriptions = new CompositeDisposable();
mSubscriptions.clear();
这种方式应该也能取消订阅
final Subject<Object> subject = bus.toSerialized();
不知这样是否能解决问题一呢?
toSerialized() 方法中其实就是之前序列化安全对象的写法。而SerializedSubject类已经变成了非public的。