美文网首页
(十四)顺序消费如何保证消息消费

(十四)顺序消费如何保证消息消费

作者: guessguess | 来源:发表于2021-08-04 19:58 被阅读0次

    集群模式下顺序消费的时候,是通过加锁的方式对队列进行占有。
    其实在单个消费者单个队列的情况下,加锁感觉是多余的。但是如果要支持消费者集群,那自然免不了加锁。
    顺序消费的方式,其实多个消费者还是要等,那么多个消费者的目的,应该是高可用。

    先来看看几个上锁以及解锁的机制。

    负载服务对于队列的锁定
    public class RebalanceService extends ServiceThread {
        private static long waitInterval =
            Long.parseLong(System.getProperty(
                "rocketmq.client.rebalance.waitInterval", "20000"));
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                this.waitForRunning(waitInterval);
                this.mqClientFactory.doRebalance();
            }
            log.info(this.getServiceName() + " service end");
        }
    }
    

    负载服务对于消息队列的锁定大概是20秒一次。对重新分配到的队列申请上锁。

    Broker端上锁
    public class RebalanceLockManager {
        LockEntry用于记录每个客户端的上锁时间
        static class LockEntry {
            private String clientId;
            private volatile long lastUpdateTimestamp = System.currentTimeMillis();
            public boolean isExpired() {
                boolean expired =
                    (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
    
                return expired;
            }
        }
    }
    

    REBALANCE_LOCK_MAX_LIVE_TIME为60秒。即每次上锁的有效时间为60秒。

    消费者内部对于队列的锁

    只有开始消费该队列的消息时才会开始上锁,代码如下

    public class ConsumeMessageOrderlyService implements ConsumeMessageService {
        class ConsumeRequest implements Runnable {
            private final ProcessQueue processQueue;
            private final MessageQueue messageQueue;
    
            public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
                this.processQueue = processQueue;
                this.messageQueue = messageQueue;
            }
    
            public ProcessQueue getProcessQueue() {
                return processQueue;
            }
    
            public MessageQueue getMessageQueue() {
                return messageQueue;
            }
    
            @Override
            public void run() {
                if (this.processQueue.isDropped()) {
                    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }
    
                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {
                ........
                }
    }
    

    在对队列进行消费之前,又会上锁。这里是以队列作为锁的粒度,保证在消费者中,队列同一时刻只能被一个线程消费。

    开始消费要进行加锁
        class ConsumeRequest implements Runnable {
            private final ProcessQueue processQueue;
            private final MessageQueue messageQueue;
    
            public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
                this.processQueue = processQueue;
                this.messageQueue = messageQueue;
            }
    
            public ProcessQueue getProcessQueue() {
                return processQueue;
            }
    
            public MessageQueue getMessageQueue() {
                return messageQueue;
            }
    
            @Override
            public void run() {
                if (this.processQueue.isDropped()) {
                    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }
    
                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {
                                boolean hasException = false;
                                try {
                                    表明开始消费
                                    this.processQueue.getLockConsume().lock();
                                    if (this.processQueue.isDropped()) {
                                        log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                            this.messageQueue);
                                        break;
                                    }
    
                                    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                                } catch (Throwable e) {
                                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                        RemotingHelper.exceptionSimpleDesc(e),
                                        ConsumeMessageOrderlyService.this.consumerGroup,
                                        msgs,
                                        messageQueue);
                                    hasException = true;
                                } finally {
                                    this.processQueue.getLockConsume().unlock();
                                }
                }
    }
    

    对队列添加消费的锁,表明已经开始消费。

    从源码的结构看,总共是有三把锁,用于保证顺序消费。
    首先是Broker端,用于保证不被其他消费者消费。总共还是很好理解的。
    其次,对于consumer中队列的锁,保证处理队列同一时刻只会被一个线程所消费。
    那么处理队列的消费锁,是用来做什么的?

    那么处理队列的消费锁,是用来做什么的?

    其实有一种情况是,当某个队列的消息正在被消费中,如果这个时候由于负载服务,导致队列重新分配,然后将队列让出去的话,那么就无法保证顺序消费了。所以负载服务对于正在消费中的处理队列是不会移除的。
    代码如下

    public abstract class RebalanceImpl {
        private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
            final boolean isOrder) {
            boolean changed = false;
    
            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = it.next();
                MessageQueue mq = next.getKey();
                ProcessQueue pq = next.getValue();
    
                if (mq.getTopic().equals(topic)) {
                    if (!mqSet.contains(mq)) {
                        pq.setDropped(true);
                        只有当这个方法返回true的时候,处理队列才会被移除。
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                        }
                    } else if (pq.isPullExpired()) {
                        switch (this.consumeType()) {
                            case CONSUME_ACTIVELY:
                                break;
                            case CONSUME_PASSIVELY:
                                pq.setDropped(true);
                                只有当这个方法返回true的时候,处理队列才会被移除。
                                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                    it.remove();
                                    changed = true;
                                    log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                        consumerGroup, mq);
                                }
                                break;
                            default:
                                break;
                        }
                    }
                }
            }
            。。。省略部分代码
        }
    
        @Override
        public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
            this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
            this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
            if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
                && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
                try {
                    如果拿得到消费锁,说明消费完了
                    if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
                        try {
                            需要判断是否还有消息没有消费完
                            return this.unlockDelay(mq, pq);
                        } finally {
                            pq.getLockConsume().unlock();
                        }
                    } else {
                        log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                            mq,
                            pq.getTryUnlockTimes());
    
                        pq.incTryUnlockTimes();
                    }
                } catch (Exception e) {
                    log.error("removeUnnecessaryMessageQueue Exception", e);
                }
                拿不到锁,说明正在消费,会直接返回false
                return false;
            }
            return true;
        }
    
    }
    

    从上面的代码可以看出,如果拿不到消费锁,说明正在消费,直接返回false, 处理队列不会被移除。
    那如果拿得到锁,说明已经消费完,那么是不是需要判断是否还有消息没有消费完?

    public class RebalancePushImpl extends RebalanceImpl {
        private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
            如果还有消息遗留,最后还是要去释放broker端的锁,只不过延时20秒后去执行。
            if (pq.hasTempMessage()) {
                log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
                this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
                    @Override
                    public void run() {
                        log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
                        RebalancePushImpl.this.unlock(mq, true);
                    }
                }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
            } else {
                this.unlock(mq, true);
            }
            return true;
        }
    

    从代码看的话,如果拿到消费锁,还有消息遗留,说明消费很多次了,这个消息还没有消费完,那么为什么消息并没有被完全消费完还要去释放锁呢?
    因为对于集群模式下的顺序消费而言,如果sendback失败,则会一直在内部进行重试。会导致有消息无法被完全消费。如果一直有消息无法被完全消费就让消费者一直持有锁,会导致类似死锁的情况。所以这里rocketmq在判断已经消费完,还有消息遗留的情况下,会去强制释放broker端的队列锁。

    如何去延长锁的持有时间

    那么如果消费持续时间很长,消费的过程会不会被打断呢?被打断是有可能导致消息无法顺序消费的。那么如何保证?
    这里也与设计层面有关。
    首先像上面的代码,如果队列处于消费中状态的话,处理队列是不会被移除的。
    然后负载服务会不断的对处理队列对应的队列去申请上锁。
    下面还是看看broker端对于上锁的处理。

    public class RebalanceLockManager {
        LockEntry用于记录每个客户端的上锁时间
        static class LockEntry {
            private String clientId;
            private volatile long lastUpdateTimestamp = System.currentTimeMillis();
    
            public String getClientId() {
                return clientId;
            }
    
            public void setClientId(String clientId) {
                this.clientId = clientId;
            }
    
            public long getLastUpdateTimestamp() {
                return lastUpdateTimestamp;
            }
    
            public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
                this.lastUpdateTimestamp = lastUpdateTimestamp;
            }
    
            public boolean isLocked(final String clientId) {
                boolean eq = this.clientId.equals(clientId);
                return eq && !this.isExpired();
            }
    
            public boolean isExpired() {
                boolean expired =
                    (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
    
                return expired;
            }
        }
        用于记录每个队列的锁的持有者
        private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
            new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
       
        用于判断某个客户端是否持有该队列的锁,支持重入---更新持有锁的时间
        private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
            ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
            if (groupValue != null) {
                LockEntry lockEntry = groupValue.get(mq);
                if (lockEntry != null) {
                    boolean locked = lockEntry.isLocked(clientId);
                    if (locked) {
                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                    }
                    return locked;
                }
            }
            return false;
        }
    
        某个组的消费者尝试对队列进行上锁
        public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
            final String clientId) {
            Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
            Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
    
            for (MessageQueue mq : mqs) {
                if (this.isLocked(group, mq, clientId)) {
                    保存已经上锁的消息队列
                    lockedMqs.add(mq);
                } else {
                    保存获取不到锁的消息队列
                    notLockedMqs.add(mq);
                }
            }
    
            对获取不到锁的消息队列的处理
            if (!notLockedMqs.isEmpty()) {
                try {
                    this.lock.lockInterruptibly();
                    try {
                        如果获取不到该组的任何信息,先进行初始化
                        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                        if (null == groupValue) {
                            groupValue = new ConcurrentHashMap<>(32);
                            this.mqLockTable.put(group, groupValue);
                        }
                        
                        for (MessageQueue mq : notLockedMqs) {
                            LockEntry lockEntry = groupValue.get(mq);
                            如果不存在该队列的持有者信息,直接上锁,说明没有被上锁
                            if (null == lockEntry) {
                                lockEntry = new LockEntry();
                                lockEntry.setClientId(clientId);
                                groupValue.put(mq, lockEntry);
                            }
                            已经持有了,进行重入,更新上锁时间
                            if (lockEntry.isLocked(clientId)) {
                                lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                                lockedMqs.add(mq);
                                continue;
                            }
    
                            String oldClientId = lockEntry.getClientId();
                            如果持有者id与当前消费者id不一样,但是原先持有者已经过期了,直接进行上锁
                            if (lockEntry.isExpired()) {
                                lockEntry.setClientId(clientId);
                                lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                                lockedMqs.add(mq);
                                continue;
                            }
                        }
                    } finally {
                        最后解锁
                        this.lock.unlock();
                    }
                } catch (InterruptedException e) {
                    log.error("putMessage exception", e);
                }
            }
           返回已经上锁的消息队列
            return lockedMqs;
        }
    
    }
    

    从代码来看,broker端是允许锁去重入的,其实就是去更新锁的持有时间。
    如果一个消费者消费持续消费了某个队列很久,那么消费服务会不断的去申请上锁,就可以一直保持对该队列的持有,其他消费者也只能等待了。

    相关文章

      网友评论

          本文标题:(十四)顺序消费如何保证消息消费

          本文链接:https://www.haomeiwen.com/subject/ptauvltx.html