项目引入了RxJava之后,想减少包的依赖,决定使用RxJava实现EventBus。先来一个简单的实现:
public class RxBus {
private final Subject<Object, Object> bus;
private RxBus() {
//把非线程安全的PublishSubject包装成线程安全的SerializedSubject
bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getDefault() {
return SingletonHolder.INSTANCE;
}
private static class SingletonHolder {
public static volatile RxBus INSTANCE = new RxBus();
}
/**
* 发送事件
*
* @param event 事件对象
*/
public void post(Object event) {
if (bus.hasObservers()) {
bus.onNext(event);
}
}
/**
* 监听事件
*
* @param event 事件对象
* @param <T> 事件类型
* @return 特定类型的Observable
*/
public <T> Observable<T> observe(Class<T> event) {
return bus.ofType(event);
}
}
发送事件:
RxBus.getDefault().post(new TestEvent ());
接收事件:
Action1 action = new Action1<TestEvent>() {
@Override
public void call(TestEvent event) {
// TODO 事件处理
}
};
RxBus.getDefault().observe(TestEvent.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(action);
OK,完成!功能是完成了,不过使用的时候感觉没有EventBus简洁,不熟悉RxJava的可能也不好理解。稍微做下改动,加个Callback:
public abstract class Callback<T> implements Action1<T> {
/**
* 监听到事件时回调接口
*
* @param t 返回结果
*/
public abstract void onEvent(T t);
@Override
public void call(T t) {
onEvent(t);
}
}
添加subscribe方法:
/**
* 订阅事件
*
* @param eventType 事件对象
* @param callback 事件回调
* @param <T> 事件类型
*/
public <T> Subscription subscribe(Class<T> eventType,Callback<T> callback) {
return bus.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.ofType(eventType)
.subscribe(callback);
}
发送事件的地方不用动,接收事件的地方就清爽多了:
RxBus.getDefault().subscribe(TestEvent.class, new Callback<TestEvent>() {
@Override
public void onEvent(TestEvent event) {
// TODO 事件处理
}
});
取消订阅
接收事件方便清爽了,下面就要考虑下取消订阅的事了。Subscription可以通过unsubscribe()方法取消订阅,但是需要逐个取消。用过EventBus的可能习惯通过unregister统一取消订阅,可以如下修改:
首先添加一个Map存储订阅对象数组:
private Map<Object, List<Subscription>> subscriptions;
修改订阅方法:
/**
* 订阅事件
*
* @param subscriber 订阅者
* @param eventType 事件对象
* @param callback 事件回调
* @param <T> 事件类型
*/
public <T> void subscribe(Object subscriber, Class<T> eventType, Callback<T> callback) {
Subscription subscription = bus.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.ofType(eventType)
.subscribe(callback);
add(subscriber, subscription);
}
/**
* 添加订阅协议
*
* @param subscriber 订阅者
* @param subscription 订阅协议
*/
private synchronized void add(Object subscriber, Subscription subscription) {
if (subscriptions == null) {
subscriptions = new HashMap<>();
}
List<Subscription> list = subscriptions.get(subscriber);
if (list == null) {
list = new ArrayList<>();
}
list.add(subscription);
subscriptions.put(subscriber, list);
}
添加取消订阅的方法:
/**
* 取消事件订阅
*
* @param subscriber 订阅者
*/
public synchronized void unsubscribe(Object subscriber) {
if (subscriptions == null) return;
List<Subscription> list = subscriptions.get(subscriber);
if (list == null || list.isEmpty()) return;
for (Subscription subscription : list) {
subscription.unsubscribe();
}
list.clear();
subscriptions.remove(subscriber);
}
/**
* 取消所有订阅
*/
public void unsubscribeAll() {
Set<Map.Entry<Object, List<Subscription>>> set = subscriptions.entrySet();
for (Map.Entry<Object, List<Subscription>> entry : set) {
List<Subscription> list = entry.getValue();
for (Subscription subscription : list) {
subscription.unsubscribe();
}
list.clear();
}
subscriptions = null;
}
OK,发送事件的地方还是不用动,接收事件修改如下:
RxBus.getDefault().subscribe(this, TestEvent.class, new Callback<TestEvent>() {
@Override
public void onEvent(TestEvent event) {
// TODO 事件处理
}
});
取消事件订阅:
RxBus.getDefault().unsubscribe(this);
后续
如果嫌订阅方法参数过多,可以通过反射方法获取事件类型。接受事件的地方就可以精简为:
RxBus.getDefault().subscribe(this, new Callback<TestEvent>() {
@Override
public void onEvent(TestEvent event) {
// TODO 事件处理
}
});
订阅方法修改:
/**
* 订阅事件
*
* @param subscriber 订阅者
* @param callback 事件回调
* @param <T> 事件类型
*/
public <T> void subscribe(Object subscriber, Callback<T> callback) {
Class<T> eventType = RawType.getRawType(callback);
Subscription subscription = bus.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.ofType(eventType)
.subscribe(callback);
add(subscriber, subscription);
}
网友评论