美文网首页
Rxjava的粘性Subject,防止消息被遗漏

Rxjava的粘性Subject,防止消息被遗漏

作者: Huangwt | 来源:发表于2019-06-24 15:32 被阅读0次

    ...

    之前测试反馈了一个bug,接口请求到了需要更新但是dialog却没有弹出,简单的看了一下,更新的逻辑是依赖于一个类似Eventbus的内容实现,在activity进入的时候绑定,在退出的时候取消绑定,而首页会跳转InitActivity进行app的初始化,导致绑定暂时失效,所以没有接受到更新的message,之前的send实现是这样

    private static final Subject<Object> messenger =    PublishSubject.create();
    
    public static void send(Object message) {
        messenger.onNext(message);
    }
    

    那么确认bug所在之后提出的需求也很简单,需要对被遗漏的message做缓存,
    最简单的方法自然是直接维护一个cache[],在send的时候将之前的内容贴上去

    
        private static List<Object> msgSticky = new ArrayList<>();
    
        public static void send(Object message) {
            messenger.onNext(message);
        }
     
        private static final int ADD = 1;
        
        private static final int REMOVE = 2;
    
        public static void sendSticky(Object message) {
            //sendmessage
            messenger.onNext(message);
            //add to cache
            operate(message, ADD);
        }
        
        
            private static void subscribeInternal(final Receiver receiver) {
            Disposable disposable = changeScheduler(receiver.scheduler)
                    .subscribe(new Consumer<Object>() {
                        @Override
                        public void accept(Object o) throws Exception {
                            if (receiver.origin == null || receiver.origin.get() == null) {
                                return;
                            }
                            CompositeDisposable s = subscriverDisposableMap
                                    .get(getKey(receiver.origin.get(), receiver.originClass));
                            if (s == null || s.isDisposed()) {
                            //delete from cache
                                operate(o, REMOVE);
                                return;
                            }
                            if (o.getClass().equals(receiver.messageType)) {
                            //delete from cache
                                operate(o, REMOVE);
                                // 消息, 原订阅者对象
                                receiver.method.invoke(receiver.proxy, o, receiver.origin.get());
                            }
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) {
                        }
                    });
            addDisposable(receiver, disposable);
            
            
            
            
    private static void addDisposable(Receiver receiver, Disposable disposable) {
            CompositeDisposable compositeDisposable = subscriverDisposableMap
                    .get(getKey(receiver.origin.get(), receiver.originClass));
            if (compositeDisposable == null) {
                compositeDisposable = new CompositeDisposable();
                subscriverDisposableMap.put(getKey(receiver.origin.get(), receiver.originClass)
                        , compositeDisposable);
            }
            compositeDisposable.add(disposable);
            for (Object msg : msgSticky) {
                messenger.onNext(msg);
            }
        }
    
        }
    

    发送粘性消息时添加一份备份到cacheList中,并在sendMessage中将粘性消息取出遍历发送,需求自然是实现了,但是很明显不太美观,所以最好的方法自然是修改Subject的实现,Rxjava初始有以下几个实现


    image.png

    具体每个Subject的职责也能从名字看出来,Replay自然就是所有消息都会被备份,但是无法被消费释放,所以不能满足我们的要求,所以需要自己实现一个Subject,思路也很简单,总而言之就是维护一个线程安全的Subject
    首先实现一个Sticky的Disposable

        static final class StickyDisposable<T extends Message> extends AtomicBoolean implements Disposable {
    
            final Observer<? super T> actual;
    
            final StickySubject<T> parent;
    
            StickyDisposable(Observer<? super T> actual, StickySubject<T> parent) {
                super(true);
                this.actual = actual;
                this.parent = parent;
            }
    
            void onNext(T t) {
                if (get()) {
                    this.actual.onNext(t);
                }
            }
    
            void onNext(T[] t) {
                if (get()) {
                    for (T s : t) {
                        this.actual.onNext(s);
                    }
                }
            }
    
            void onComplete() {
                if (get()) {
                    this.actual.onComplete();
                }
            }
    
            void onError(Throwable t) {
                if (get()) {
                    this.actual.onError(t);
                } else {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public void dispose() {
                if (compareAndSet(true, false)) {
                    parent.remove(this);
                }
            }
    
            @Override
            public boolean isDisposed() {
                return !get();
            }
    

    同常规的disposable一样,继承原子boolean类保证线程安全,且当dispose时从parentSubject中移除。
    接着是消息的消费者抽象类MessageConsumer,这点没什么特殊,唯一要实现的就是accept消息时需要保证消息还未被消费,逻辑由subject实现
    最后就是具体的subject:

    public final class StickySubject<T extends Message> extends Subject<T> {
    
        private final AtomicReference<Message[]> stickyMsg;
    
        private final AtomicReference<StickyDisposable<T>[]> subscribers;
    
        private static final StickyDisposable[] EMPTY = new StickyDisposable[0];
    
        private static final StickyDisposable[] TERMINATED = new StickyDisposable[0];
    
        public static <T extends Message> StickySubject<T> create() {
            return new StickySubject<T>();
        }
    
        private StickySubject() {
            this.stickyMsg = new AtomicReference<>(new Message[0]);
            this.subscribers = new AtomicReference<StickyDisposable<T>[]>(EMPTY);
        }
    
        private boolean add(StickyDisposable<T> disposable) {
            for (;;) {
                StickyDisposable[] subscribers = this.subscribers.get();
                if (subscribers == TERMINATED) {
                    return false;
                }
                int length = subscribers.length;
                StickyDisposable[] tmp = new StickyDisposable[length + 1];
                System.arraycopy(subscribers, 0, tmp, 0, length);
                tmp[length] = disposable;
                if (this.subscribers.compareAndSet(subscribers, tmp)) {
                    offerMsg(tmp);
                    return true;
                }
            }
        }
    
        private void offerMsg(StickyDisposable[] tmp) {
            Message[] msg = stickyMsg.get();
            for (StickyDisposable disposable : tmp) {
                disposable.onNext(msg);
            }
        }
    
        private synchronized boolean consume(Message msg) {
            for (;;) {
                StickyDisposable[] disposables = this.subscribers.get();
                if (disposables == TERMINATED) {
                    return false;
                }
                Message[] msgCache = this.stickyMsg.get();
                int position = 0;
                for (;;) {
                    if (position >= msgCache.length) {
                        return false;
                    }
                    if (msgCache[position].equals(msg)) {
                        break;
                    }
                    position ++;
                }
                Message[] after = new Message[msgCache.length - 1];
                if (msgCache.length > 1) {
                    System.arraycopy(msgCache, 0, after, 0, position);
                    System.arraycopy(msgCache, position + 1, after, position, msgCache.length - position - 1);
                }
                if (this.stickyMsg.compareAndSet(msgCache, after)) {
                    return true;
                }
            }
        }
    
        private synchronized boolean hasConsume(Message msg) {
            Message[] msgs = this.stickyMsg.get();
            for (Message sMsg : msgs) {
                if (sMsg.equals(msg)) {
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean hasObservers() {
            return this.subscribers.get().length > 0;
        }
    
        @Override
        public boolean hasThrowable() {
            return false;
        }
    
        @Override
        public boolean hasComplete() {
            return this.subscribers.get() == TERMINATED;
        }
    
        @Override
        public Throwable getThrowable() {
            return null;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            StickyDisposable<T> disposable = new StickyDisposable<T>(observer, this);
            observer.onSubscribe(disposable);
            if (add(disposable)) {
                if (disposable.isDisposed()) {
                    remove(disposable);
                }
            } else {
                observer.onComplete();
            }
        }
    
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public synchronized void onNext(T t) {
            StickyDisposable[] stickyDisposables = this.subscribers.get();
            if (t.isSticky()) {
                for (;;) {
                    Message[] msgCache = this.stickyMsg.get();
                    Message[] tempCache = new Message[msgCache.length + 1];
                    System.arraycopy(msgCache, 0, tempCache, 0, msgCache.length);
                    tempCache[msgCache.length] = t;
                    if (stickyMsg.compareAndSet(msgCache, tempCache)) {
                        break;
                    }
                }
            }
            for (StickyDisposable disposable : stickyDisposables) {
                disposable.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable e) {
            RxJavaPlugins.onError(e);
        }
    
        @Override
        public void onComplete() {
            StickyDisposable[] disposables = this.subscribers.get();
            for (StickyDisposable disposable : disposables) {
                disposable.onComplete();
            }
            this.subscribers.set(TERMINATED);
        }
    
        public void remove(StickyDisposable<T> disposable) {
            for (;;) {
                StickyDisposable[] disposables = this.subscribers.get();
                if (disposables == EMPTY || disposables == TERMINATED) {
                    return;
                }
                int position = 0;
                while (position <= disposables.length - 1 && disposable != disposables[position]) {
                    position++;
                }
                if (position > disposables.length - 1){
                    return;
                }
                StickyDisposable[] temp = new StickyDisposable[disposables.length - 1];
                if (disposables.length == 1) {
                    temp = EMPTY;
                } else {
                    System.arraycopy(disposables, 0, temp, 0, position);
                    System.arraycopy(disposables, position + 1, temp, position, disposables.length - 1 - position);
                }
                if (this.subscribers.compareAndSet(disposables, temp)) {
                    return;
                }
            }
        }
    

    因为代码写的有点久了,也没有什么复杂的逻辑,主要是做个记录,有什么问题或者漏洞欢迎指出,最后贴个链接~:

    github-MessageSample

    相关文章

      网友评论

          本文标题:Rxjava的粘性Subject,防止消息被遗漏

          本文链接:https://www.haomeiwen.com/subject/fhloqctx.html