美文网首页
Kafka/RocketMQ顺序消息对比

Kafka/RocketMQ顺序消息对比

作者: 进击的蚂蚁zzzliu | 来源:发表于2021-06-15 14:42 被阅读0次

    一、Kafka顺序消息

    • Producer端:Kafka的顺序消息是通过partition key,将某类消息(例如同一笔订单的不同状态)写入同一个partition,因此Kafka只能保证消息在同一个partition内有序,无法保证全局有序;
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    return Math.abs(key.hashCode()) % partitions.size();
    
    • Consumer端:Kafka Java Consumer是单线程的设计(多线程方案需要业务端自己实现),即一个partition只能对应一个消费线程,因此可以保证消息被顺序消费;

    二、RocketMQ顺序消息

    • Producer端:RMQ顺序消息跟Kafka类似,通过消息路由机制把消息发送到指定的MessageQueue中(参考Kafka/RocketMQ生产者路由对比);
    • Consumer端:RMQ Java Consumer是线程池的设计,因此在集群模式下消费的顺序消费,需要通过一系列设计来保证;
    1. Consumer 注册 MessageListenerOrderly
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group_1");
        consumer.setNamesrvAddr("192.168.0.99:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                // TODO
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
    
    2. 定时向broker发送锁住当前正在消费的队列集合的消息

    2.1 Consumer 启动时根据消息监听器类型创建监听服务

    // 根据是否顺序消费,创建消费端消费线程服务;ConsumeMessageService主要负责消息消费,内部维护线程池;
    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
        this.consumeOrderly = true;
        this.consumeMessageService =
            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
    }
    this.consumeMessageService.start();
    

    2.2 ConsumeMessageOrderlyService.start 启动定时任务(默认频率20s)

    public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            // 如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,
            // 具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }
    

    2.3 向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。consumer收到后,设置是否锁住标志位。保证broker中的每个消息队列只对应一个消费端。

    public void lockAll() {
        // broker -> broker上的 MessageQueue(当前Consumer消费的MessageQueue)
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, Set<MessageQueue>> entry = it.next();
            final String brokerName = entry.getKey();
            final Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty())
                continue;
    
            // 主要获取 broker 地址
            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
            if (findBrokerResult != null) {
                // 组装批量锁定请求
                LockBatchRequestBody requestBody = new LockBatchRequestBody();
                requestBody.setConsumerGroup(this.consumerGroup);
                requestBody.setClientId(this.mQClientFactory.getClientId());
                requestBody.setMqSet(mqs); // MessageQueue
    
                try {
                    // 发送请求到Broker,Broker返回锁定的MessageQueue集合
                    Set<MessageQueue> lockOKMQSet =
                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
    
                    for (MessageQueue mq : lockOKMQSet) {
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        if (processQueue != null) {
                            if (!processQueue.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                            }
                            // Broker锁定的集合,在本地加锁,后面拉取消息消费时会用到
                            processQueue.setLocked(true);
                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    ......
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + mqs, e);
                }
            }
        }
    }
    
    3. 消费消息时通过锁实现串行执行

    3.1 DefaultMQPushConsumerImpl.pullMessage 拉取消息提交到ConsumeMessageOrderlyService的线程池consumeExecutor中执行

    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
    

    3.2 ConsumeMessageOrderlyService.ConsumeRequest.run 消费消息,消费时对消费队列进行加锁,保证同一个消费队列中的多条消息会串行执行;

    @Override
    public void run() {
        if (this.processQueue.isDropped()) {
            log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
            return;
        }
    
        // 获取当前 MessageQueue 的锁
        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized (objLock) {
            // 广播模式 或者 ProcessQueue上锁(lockAll()进行上锁)并且锁没有过期,否则延迟 10ms 再执行
            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                final long beginTime = System.currentTimeMillis();
                for (boolean continueConsume = true; continueConsume; ) {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        break;
                    }
                    // ProcessQueue 未上锁
                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && !this.processQueue.isLocked()) {
                        log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }
                    // ProcessQueue 锁过期
                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && this.processQueue.isLockExpired()) {
                        log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }
    
                    // 消费任务一次运行的最大时间。可以通过-Drocketmq.client.maxTimeConsumeContinuously来设置,默认为60s。
                    long interval = System.currentTimeMillis() - beginTime;
                    if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                        ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                        break;
                    }
    
                    // 消费批次大小,默认为1, 也就是一个一个消费,实际生产环境可以调整大
                    final int consumeBatchSize =
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    
                    // 从treeMap里面依次获取对应数量的消息出来,取得时候加读写锁
                    List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                    if (!msgs.isEmpty()) {
                        ......
                        long beginTimestamp = System.currentTimeMillis();
                        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                        boolean hasException = false;
                        try {
                            // 队列锁---防止顺序消息被重复消费
                            this.processQueue.getLockConsume().lock();
                            if (this.processQueue.isDropped()) {
                                log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                    this.messageQueue);
                                break;
                            }
                            // 进行消息消费,返回消费结果
                            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                        } catch (Throwable e) {
                            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                RemotingHelper.exceptionSimpleDesc(e),
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                            hasException = true;
                        } finally {
                            // 消费锁释放
                            this.processQueue.getLockConsume().unlock();
                        }
                        ......
                        // 处理消费结果
                        continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                    } else {
                        continueConsume = false;
                    }
                }
            } else {
                // 当队列没有上锁,那么会走这一块,然后进行上锁,这块最终又会重新执行到上面的代码里面去
                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
            }
        }
    }
    
    4. 小结

    1. 通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列。设置本地队列ProcessQueue的locked属性为true。保证broker中的每个消息队列只对应一个消费端;
    2. 消费端也是通过锁,保证每个ProcessQueue只有一个线程消费;
    3. 当新增消费者或者减少消费者,消费者数量变更的时候,会触发负载均衡,客户端会重新计算消费的队列,这个时候会把不需要再消费的队列的ProcessQueue上的锁释放掉,同时还是去borker里面对新消费的队列进行上锁,如果上锁失败,那么这个队列的消息是不能消费的,只有上锁成功才能被消费;
    -------------over-----------

    相关文章

      网友评论

          本文标题:Kafka/RocketMQ顺序消息对比

          本文链接:https://www.haomeiwen.com/subject/hmxreltx.html