美文网首页
Kafka/RocketMQ生产者路由对比

Kafka/RocketMQ生产者路由对比

作者: 进击的蚂蚁zzzliu | 来源:发表于2021-06-09 18:44 被阅读0次

一、Kafka分区机制

Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。


kafka分区.png
1. 默认分区策略
  • 如果指定了Key,那么默认实现按消息键保序策略,即相同Key 的消息会发送到同一个Partition 中;
  • 如果没有指定Key,则在所有Partition 中使用轮询策略;
2. 自定义策略

如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class,并编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口的partition方法;

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)

  • 随机策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
  • 消息键保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
  • 根据Broker的特殊信息分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
3. 重试机制

调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
如果发送失败,那么判断是否允许重试。如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;

二、RMQ发送消息路由机制

RMQ的消息组织方式为:主题-队列-消息。主题下的每条消息只会保存在某一个队列中,而不会在多个队列中被保存多份。


1. 默认路由策略
public SendResult send(Message msg)

使用该方法发送消息时默认使用轮询策略;Producer从namesrv获取的到Topic_A路由信息TopicPublishInfo

// 主题的消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 每选择一次消息队列,该值会自增1,达到Integer.MAX_VALUE则重置为0,用于选择消息队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

每次获取queue都会通过sendWhichQueue加一来实现对所有queue的轮询;

2. 自定义路由策略
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)

使用该方法发送时需要实现MessageQueueSelector接口的select方法实现自定义路由策略;
例如:

  • 通过指定值hash
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    int value = arg.hashCode();
    if (value < 0) {
        value = Math.abs(value);
    }
    value = value % mqs.size();
    return mqs.get(value);
}
  • 随机
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    int value = random.nextInt(mqs.size());
    return mqs.get(value);
}
3. 重试机制
  • 对于send(Message msg)方法发送的消息,如果发送失败,默认重试2次,RocketMQ选择队列默认是通过MQFaultStrategy#selectOneMessageQueue来选择一个的队列,在未开启延迟容错的情况下,内部会调用TopicPublishInfo#selectOneMessageQueue方法
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 消息第一次发送,直接轮询
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        // 消息发送失败重试,优先选择其他broker上的队列
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        // 没有其他broker可选,依然轮询,有可能发到之前失败的broker上
        return selectOneMessageQueue();
    }
}
  • 对于自定义路由策略或者指定MessageQueue发送的消息
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
public SendResult send(Message msg, MessageQueue mq)

只能根据自定义策略发送到特定的Broker上的某个特定的Queue中,如果发送失败,重试失败的可能依然很大,所以默认不进行重试。如果需要重试,需要业务方自己来做,例如通过一个for循环最多重试几次。

----------over---------

相关文章

网友评论

      本文标题:Kafka/RocketMQ生产者路由对比

      本文链接:https://www.haomeiwen.com/subject/zsxreltx.html