...
之前测试反馈了一个bug,接口请求到了需要更新但是dialog却没有弹出,简单的看了一下,更新的逻辑是依赖于一个类似Eventbus的内容实现,在activity进入的时候绑定,在退出的时候取消绑定,而首页会跳转InitActivity进行app的初始化,导致绑定暂时失效,所以没有接受到更新的message,之前的send实现是这样
private static final Subject<Object> messenger = PublishSubject.create();
public static void send(Object message) {
messenger.onNext(message);
}
那么确认bug所在之后提出的需求也很简单,需要对被遗漏的message做缓存,
最简单的方法自然是直接维护一个cache[],在send的时候将之前的内容贴上去
private static List<Object> msgSticky = new ArrayList<>();
public static void send(Object message) {
messenger.onNext(message);
}
private static final int ADD = 1;
private static final int REMOVE = 2;
public static void sendSticky(Object message) {
//sendmessage
messenger.onNext(message);
//add to cache
operate(message, ADD);
}
private static void subscribeInternal(final Receiver receiver) {
Disposable disposable = changeScheduler(receiver.scheduler)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
if (receiver.origin == null || receiver.origin.get() == null) {
return;
}
CompositeDisposable s = subscriverDisposableMap
.get(getKey(receiver.origin.get(), receiver.originClass));
if (s == null || s.isDisposed()) {
//delete from cache
operate(o, REMOVE);
return;
}
if (o.getClass().equals(receiver.messageType)) {
//delete from cache
operate(o, REMOVE);
// 消息, 原订阅者对象
receiver.method.invoke(receiver.proxy, o, receiver.origin.get());
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
});
addDisposable(receiver, disposable);
private static void addDisposable(Receiver receiver, Disposable disposable) {
CompositeDisposable compositeDisposable = subscriverDisposableMap
.get(getKey(receiver.origin.get(), receiver.originClass));
if (compositeDisposable == null) {
compositeDisposable = new CompositeDisposable();
subscriverDisposableMap.put(getKey(receiver.origin.get(), receiver.originClass)
, compositeDisposable);
}
compositeDisposable.add(disposable);
for (Object msg : msgSticky) {
messenger.onNext(msg);
}
}
}
发送粘性消息时添加一份备份到cacheList中,并在sendMessage中将粘性消息取出遍历发送,需求自然是实现了,但是很明显不太美观,所以最好的方法自然是修改Subject的实现,Rxjava初始有以下几个实现
image.png
具体每个Subject的职责也能从名字看出来,Replay自然就是所有消息都会被备份,但是无法被消费释放,所以不能满足我们的要求,所以需要自己实现一个Subject,思路也很简单,总而言之就是维护一个线程安全的Subject
首先实现一个Sticky的Disposable
static final class StickyDisposable<T extends Message> extends AtomicBoolean implements Disposable {
final Observer<? super T> actual;
final StickySubject<T> parent;
StickyDisposable(Observer<? super T> actual, StickySubject<T> parent) {
super(true);
this.actual = actual;
this.parent = parent;
}
void onNext(T t) {
if (get()) {
this.actual.onNext(t);
}
}
void onNext(T[] t) {
if (get()) {
for (T s : t) {
this.actual.onNext(s);
}
}
}
void onComplete() {
if (get()) {
this.actual.onComplete();
}
}
void onError(Throwable t) {
if (get()) {
this.actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
if (compareAndSet(true, false)) {
parent.remove(this);
}
}
@Override
public boolean isDisposed() {
return !get();
}
同常规的disposable一样,继承原子boolean类保证线程安全,且当dispose时从parentSubject中移除。
接着是消息的消费者抽象类MessageConsumer,这点没什么特殊,唯一要实现的就是accept消息时需要保证消息还未被消费,逻辑由subject实现
最后就是具体的subject:
public final class StickySubject<T extends Message> extends Subject<T> {
private final AtomicReference<Message[]> stickyMsg;
private final AtomicReference<StickyDisposable<T>[]> subscribers;
private static final StickyDisposable[] EMPTY = new StickyDisposable[0];
private static final StickyDisposable[] TERMINATED = new StickyDisposable[0];
public static <T extends Message> StickySubject<T> create() {
return new StickySubject<T>();
}
private StickySubject() {
this.stickyMsg = new AtomicReference<>(new Message[0]);
this.subscribers = new AtomicReference<StickyDisposable<T>[]>(EMPTY);
}
private boolean add(StickyDisposable<T> disposable) {
for (;;) {
StickyDisposable[] subscribers = this.subscribers.get();
if (subscribers == TERMINATED) {
return false;
}
int length = subscribers.length;
StickyDisposable[] tmp = new StickyDisposable[length + 1];
System.arraycopy(subscribers, 0, tmp, 0, length);
tmp[length] = disposable;
if (this.subscribers.compareAndSet(subscribers, tmp)) {
offerMsg(tmp);
return true;
}
}
}
private void offerMsg(StickyDisposable[] tmp) {
Message[] msg = stickyMsg.get();
for (StickyDisposable disposable : tmp) {
disposable.onNext(msg);
}
}
private synchronized boolean consume(Message msg) {
for (;;) {
StickyDisposable[] disposables = this.subscribers.get();
if (disposables == TERMINATED) {
return false;
}
Message[] msgCache = this.stickyMsg.get();
int position = 0;
for (;;) {
if (position >= msgCache.length) {
return false;
}
if (msgCache[position].equals(msg)) {
break;
}
position ++;
}
Message[] after = new Message[msgCache.length - 1];
if (msgCache.length > 1) {
System.arraycopy(msgCache, 0, after, 0, position);
System.arraycopy(msgCache, position + 1, after, position, msgCache.length - position - 1);
}
if (this.stickyMsg.compareAndSet(msgCache, after)) {
return true;
}
}
}
private synchronized boolean hasConsume(Message msg) {
Message[] msgs = this.stickyMsg.get();
for (Message sMsg : msgs) {
if (sMsg.equals(msg)) {
return false;
}
}
return true;
}
@Override
public boolean hasObservers() {
return this.subscribers.get().length > 0;
}
@Override
public boolean hasThrowable() {
return false;
}
@Override
public boolean hasComplete() {
return this.subscribers.get() == TERMINATED;
}
@Override
public Throwable getThrowable() {
return null;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
StickyDisposable<T> disposable = new StickyDisposable<T>(observer, this);
observer.onSubscribe(disposable);
if (add(disposable)) {
if (disposable.isDisposed()) {
remove(disposable);
}
} else {
observer.onComplete();
}
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public synchronized void onNext(T t) {
StickyDisposable[] stickyDisposables = this.subscribers.get();
if (t.isSticky()) {
for (;;) {
Message[] msgCache = this.stickyMsg.get();
Message[] tempCache = new Message[msgCache.length + 1];
System.arraycopy(msgCache, 0, tempCache, 0, msgCache.length);
tempCache[msgCache.length] = t;
if (stickyMsg.compareAndSet(msgCache, tempCache)) {
break;
}
}
}
for (StickyDisposable disposable : stickyDisposables) {
disposable.onNext(t);
}
}
@Override
public void onError(Throwable e) {
RxJavaPlugins.onError(e);
}
@Override
public void onComplete() {
StickyDisposable[] disposables = this.subscribers.get();
for (StickyDisposable disposable : disposables) {
disposable.onComplete();
}
this.subscribers.set(TERMINATED);
}
public void remove(StickyDisposable<T> disposable) {
for (;;) {
StickyDisposable[] disposables = this.subscribers.get();
if (disposables == EMPTY || disposables == TERMINATED) {
return;
}
int position = 0;
while (position <= disposables.length - 1 && disposable != disposables[position]) {
position++;
}
if (position > disposables.length - 1){
return;
}
StickyDisposable[] temp = new StickyDisposable[disposables.length - 1];
if (disposables.length == 1) {
temp = EMPTY;
} else {
System.arraycopy(disposables, 0, temp, 0, position);
System.arraycopy(disposables, position + 1, temp, position, disposables.length - 1 - position);
}
if (this.subscribers.compareAndSet(disposables, temp)) {
return;
}
}
}
因为代码写的有点久了,也没有什么复杂的逻辑,主要是做个记录,有什么问题或者漏洞欢迎指出,最后贴个链接~:
网友评论