kafka0.9以下版本,提供了scala版本的api接口。该版本的发送逻辑分析在:
kafka0.8.2.1 producer源码分析
其中结尾提到,生产者producer在不设置分区key情况下,消息都会发送到单个分区,这在生产环境中大大影响系统吞吐量。下面通过demo验证该问题。
由于kafka borker能向下兼容,测试环境中kafka broker版本大于producer版本号即可。通过命令添加测试topic,包含3个分区,如下:
mac:2.3.1 mx$ kafka-topics --describe --zookeeper localhost:2181 --topic test2
Topic:test2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
测试producer添加0.8.2.1版本的kafka依赖:
<!-- ======= kafka ======= -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
简单的producer配置及发送代码:
private ProducerConfig buildProperties() {
Properties properties = new Properties();
properties.put("serializer.class", "kafka.serializer.StringEncoder");
//properties.put("metadata.broker.list", "10.13.1.24:9092,10.13.1.22:9092,10.13.1.26:9092");
properties.put("metadata.broker.list", "10.226.245.114:9092");
return new ProducerConfig(properties);
}
...
public static void main(String[] args) {
Producer<String, String> producer = new Test().producer();
for (int i = 0; i < 20; i++) {
String msg = "e" + i;
producer.send(new KeyedMessage<>("test2", msg));
}
}
另起个消费者,按分区消费数据,用于观察分区情况:
@Component
public class TestKafkaListener {
private static final Logger log = LoggerFactory.getLogger(TestKafkaListener.class);
@KafkaListener(id = "c_1", topicPartitions = {@TopicPartition(topic = "test2", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void partition0(String msgData) {
log.info("demo3 receive : " + msgData + ", partition: 0" );
System.out.println("demo3 receive : " + msgData + ", partition: 0" );
}
@KafkaListener(id = "c2", topicPartitions = {@TopicPartition(topic = "test2", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))})
public void partition1(String msgData) {
log.info("demo3 receive : " + msgData + ", partition: 1" );
System.out.println("demo3 receive : " + msgData + ", partition: 1" );
}
@KafkaListener(id = "c3", topicPartitions = {@TopicPartition(topic = "test2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "0"))})
public void listenPartitionOnly(String msgData) {
log.info("demo3 receive : " + msgData + ", partition: 2" );
System.out.println("demo3 receive : " + msgData + ", partition: 2" );
}
}
运行producer,消费端输出:
demo3 receive : e0, partition: 1
demo3 receive : e1, partition: 1
demo3 receive : e2, partition: 1
demo3 receive : e3, partition: 1
demo3 receive : e4, partition: 1
demo3 receive : e5, partition: 1
demo3 receive : e6, partition: 1
demo3 receive : e7, partition: 1
demo3 receive : e8, partition: 1
demo3 receive : e9, partition: 1
...
消息分区均为分区1,即验证了默认情况下都在同一分区。
解决办法:
构造KeyedMessage时添加分区key即可,修改发送逻辑,以消息体为分区key:
public static void main(String[] args) {
Producer<String, String> producer = new Test().producer();
for (int i = 0; i < 50; i++) {
String msg = "f" + i;
producer.send(new KeyedMessage<>("test2", msg, msg));
}
}
输出:
demo3 receive : f0, partition: 0
demo3 receive : f1, partition: 1
demo3 receive : f2, partition: 2
demo3 receive : f4, partition: 1
demo3 receive : f7, partition: 1
demo3 receive : f5, partition: 2
demo3 receive : f3, partition: 0
demo3 receive : f10, partition: 1
demo3 receive : f8, partition: 2
demo3 receive : f11, partition: 2
demo3 receive : f14, partition: 2
demo3 receive : f13, partition: 1
...
结果符合预期,消息被分配到不同分区。
网友评论