美文网首页
kafka0.9以下版本默认分区问题验证

kafka0.9以下版本默认分区问题验证

作者: matthewfly | 来源:发表于2020-11-09 10:49 被阅读0次

kafka0.9以下版本,提供了scala版本的api接口。该版本的发送逻辑分析在:
kafka0.8.2.1 producer源码分析

其中结尾提到,生产者producer在不设置分区key情况下,消息都会发送到单个分区,这在生产环境中大大影响系统吞吐量。下面通过demo验证该问题。

由于kafka borker能向下兼容,测试环境中kafka broker版本大于producer版本号即可。通过命令添加测试topic,包含3个分区,如下:

mac:2.3.1 mx$ kafka-topics --describe --zookeeper localhost:2181 --topic test2
Topic:test2 PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: test2    Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: test2    Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: test2    Partition: 2    Leader: 0   Replicas: 0 Isr: 0

测试producer添加0.8.2.1版本的kafka依赖:

        <!-- ======= kafka ======= -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>

简单的producer配置及发送代码:

private ProducerConfig buildProperties() {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        //properties.put("metadata.broker.list", "10.13.1.24:9092,10.13.1.22:9092,10.13.1.26:9092");
        properties.put("metadata.broker.list", "10.226.245.114:9092");
        return new ProducerConfig(properties);
    }
...
 public static void main(String[] args) {
        Producer<String, String> producer = new Test().producer();

        for (int i = 0; i < 20; i++) {
            String msg = "e" + i;
            producer.send(new KeyedMessage<>("test2", msg));
        }
}

另起个消费者,按分区消费数据,用于观察分区情况:

@Component
public class TestKafkaListener {

   private static final Logger log = LoggerFactory.getLogger(TestKafkaListener.class);

   @KafkaListener(id = "c_1", topicPartitions = {@TopicPartition(topic = "test2", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
   public void partition0(String msgData) {
       log.info("demo3 receive : " + msgData + ", partition: 0" );
       System.out.println("demo3 receive : " + msgData + ", partition: 0" );
   }

   @KafkaListener(id = "c2", topicPartitions = {@TopicPartition(topic = "test2", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))})
   public void partition1(String msgData) {
       log.info("demo3 receive : " + msgData + ", partition: 1" );
       System.out.println("demo3 receive : " + msgData + ", partition: 1" );
   }

   @KafkaListener(id = "c3", topicPartitions = {@TopicPartition(topic = "test2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "0"))})
   public void listenPartitionOnly(String msgData) {
       log.info("demo3 receive : " + msgData + ", partition: 2" );
       System.out.println("demo3 receive : " + msgData + ", partition: 2" );
   }
}

运行producer,消费端输出:

demo3 receive : e0, partition: 1
demo3 receive : e1, partition: 1
demo3 receive : e2, partition: 1
demo3 receive : e3, partition: 1
demo3 receive : e4, partition: 1
demo3 receive : e5, partition: 1
demo3 receive : e6, partition: 1
demo3 receive : e7, partition: 1
demo3 receive : e8, partition: 1
demo3 receive : e9, partition: 1
...

消息分区均为分区1,即验证了默认情况下都在同一分区。

解决办法:
构造KeyedMessage时添加分区key即可,修改发送逻辑,以消息体为分区key:

public static void main(String[] args) {
        Producer<String, String> producer = new Test().producer();

        for (int i = 0; i < 50; i++) {
            String msg = "f" + i;
            producer.send(new KeyedMessage<>("test2", msg, msg));
        }
}

输出:

demo3 receive : f0, partition: 0
demo3 receive : f1, partition: 1
demo3 receive : f2, partition: 2
demo3 receive : f4, partition: 1
demo3 receive : f7, partition: 1
demo3 receive : f5, partition: 2
demo3 receive : f3, partition: 0
demo3 receive : f10, partition: 1
demo3 receive : f8, partition: 2
demo3 receive : f11, partition: 2
demo3 receive : f14, partition: 2
demo3 receive : f13, partition: 1
...

结果符合预期,消息被分配到不同分区。

相关文章

网友评论

      本文标题:kafka0.9以下版本默认分区问题验证

      本文链接:https://www.haomeiwen.com/subject/enhlbktx.html