一、Kafka分区机制
Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
kafka分区.png
1. 默认分区策略
- 如果指定了Key,那么默认实现按消息键保序策略,即相同Key 的消息会发送到同一个Partition 中;
- 如果没有指定Key,则在所有Partition 中使用轮询策略;
2. 自定义策略
如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class,并编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口的partition方法;
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)
- 随机策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
- 消息键保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
- 根据Broker的特殊信息分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
3. 重试机制
调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
如果发送失败,那么判断是否允许重试。如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;
二、RMQ发送消息路由机制
RMQ的消息组织方式为:主题-队列-消息。主题下的每条消息只会保存在某一个队列中,而不会在多个队列中被保存多份。
1. 默认路由策略
public SendResult send(Message msg)
使用该方法发送消息时默认使用轮询策略;Producer从namesrv获取的到Topic_A路由信息TopicPublishInfo
// 主题的消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 每选择一次消息队列,该值会自增1,达到Integer.MAX_VALUE则重置为0,用于选择消息队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
每次获取queue都会通过sendWhichQueue加一来实现对所有queue的轮询;
2. 自定义路由策略
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
使用该方法发送时需要实现MessageQueueSelector接口的select方法实现自定义路由策略;
例如:
- 通过指定值hash
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
- 随机
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
3. 重试机制
- 对于
send(Message msg)
方法发送的消息,如果发送失败,默认重试2次,RocketMQ选择队列默认是通过MQFaultStrategy#selectOneMessageQueue来选择一个的队列,在未开启延迟容错的情况下,内部会调用TopicPublishInfo#selectOneMessageQueue方法
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 消息第一次发送,直接轮询
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 消息发送失败重试,优先选择其他broker上的队列
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 没有其他broker可选,依然轮询,有可能发到之前失败的broker上
return selectOneMessageQueue();
}
}
- 对于自定义路由策略或者指定MessageQueue发送的消息
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
public SendResult send(Message msg, MessageQueue mq)
只能根据自定义策略发送到特定的Broker上的某个特定的Queue中,如果发送失败,重试失败的可能依然很大,所以默认不进行重试。如果需要重试,需要业务方自己来做,例如通过一个for循环最多重试几次。
----------over---------
网友评论