一 安装及简单使用
kafka可以安装在windos mac linux上,以mac为例
mac 安装步骤:
$ brew install kafka
$ zkServer start
$ brew services start kafka
创建主题:
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
使用产生者发送消息:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
TEST
TEST2
TEST3
TEST4
使用消息者接收消息:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
TEST
TEST2
TEST3
TEST4
二 使用Java客户端
首先需要引入kafka的包:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
2.1 生产者
简单示例
Properties properties = new Properties();
properties.put("bootstrap.servers", url);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, value);
kafkaProducer.send(producerRecord);
kafkaProducer.send(producerRecord); 这个方法会同步发送消息,如果需要异步,可以使用下面的方法:
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//handler result......
}
});
2.2 消费者
public class MKafkaConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "group.default");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties,
new StringDeserializer(), new StringDeserializer());
kafkaConsumer.subscribe(Collections.singletonList("test")); // A
try {
while (true) { // B
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(100)); // C
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
}
} finally {
kafkaConsumer.close();
}
}
}
A处代码订阅主题,B处轮询,C处获取消息,最多阻塞100ms,100ms后没有消息会返回空。
三 生产者详解
相关配置
- acts:指定至少要有多少个分区副本收到消息,生产者才会认为消息写入成功。为0时生产者不会等待服务器返回结果;为1时只有集群首领写入成功,服务器就会返回。acts=all时,要求所有分区都要收到消息。
- buffer.memory:生产者内存缓冲大小
- compression.type: 消息压缩方式。
gzip: 压缩效果好,占用CPU高
snappy: 压缩效果不如gzip,但是占用CPU低
retries: 重试次数;retry.backoff.ms重试时间间隔 - batch.size: 当有多个消息要被发送到同一个分区时,会先放在一个批次里,然后同一批次的一起了送。这个数据指定批次大小。
- linger.ms: 指定同一批次等待的时间。如果同一批次等待这个时间后没有新消息加入,则发送这个批次。
- max.in.flight.requests.per.connection: 消息并发发送的最大数量
- request.timeout.ms:发送数据时等待服务器返回的超时时间
- metadata.fetch.timeout.ms:获取元数据的超时时间
控制消息发送的顺序:需要把max.in.flight.requests.per.connection设置为1,即串行发送。
分区控制
kafka的消息分为健和值,值就是真正的消息内容。健可以作为消息的附加体,也会被用来分区,kafka的默认分区规则如下:
当健值为null时,会随机分配可用分区
当健值为不为null时,会对健进行散列,然后根据散列值来映射到所有分区上
由于是根据散列值来映射的,所以只有分区数量不变的情况下才能保证散列值相同的消息被分配到同一分区上。
以上说的是默认的分区器,我们可以自定义分区器,只需要实现Partitioner即可
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//计算分区并反回
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
四 消费者详解
4.1 消费者和消费群组
假如我们有一个kafka主题,生产者生产消息的速度是1000QPS,这时候我们只需要使用一台机器消费消息即可,这一台机器就是一个消费者。
但是如果生产者生产消息的速度是10 0000QPS,用一台机器肯定是不够的,我们可以用10台,每台消费其中1/10的消息,那这十台就组成了一个消费群组。上面的例子中我们指定了消费者所属的群组:
properties.put("group.id", "group.default");
这里举的是机器维度的例子,实际上,我们可以在同一个机器的同一个服务中启动多个消费者。
同一个消费群组的机器会分别消费主题中的不同分区,例如我们的主题有10个分区:
- 群组中只有一个消费组者,这个消费者消费所有的分区。
- 群组中有5个消费者,每个消费者分别消费2个分区。
- 群组中有10个消费者,每个消费者分别消费1个分区。
- 群组中有大于10个消费者,只会有10个消费者消费消息,其余的将处于空闲状态
同一个主题可以有多个消费者群组,那么每一个消费者群组会消费主题中的所有消息。
4.2 分区分配策略
上面说过,群组中每一个消费者会消费特定的分区,分区与消费者的对应关系就由分区的分配策略决定。
在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:
- 同一个消费群组内新增消费者
- 消费者离开当前所属的消费群组
- 订阅的主题新增分区
关于这两种分配策略,这里有篇文章写得不错:Kafka分区分配策略 这里简单总结一下:
- Range策略:独立的分配每个主题,使每个主题的分区尽量平均的分配到各个消费者中。如10个分区三个消费者,分出来的结果是三个消费都分别消费4,3,3个分区。
- RoundRobin策略:上面的Range策略在只消费一个主题时还是很平均的,但是如果一个群组订阅了100个主题,每个都有10个分区,由于上面的算法是独立分配每个主题的,所以分出来的可能就是400,300,300
RoundRobin策略会考虑多个主题,所以分配出来的会是334,333,333。更加均匀。
4.4 消费者群组是如何协调的
上面说了分区的分配策略,那么这个策略由谁来执行,由谁来触发更新呢,这个就是消费组协调策略:
- 当消费者群组中第一个消费者启动时,会向集群申请一个broker作为协调器。集群会指定一个负载比较低的作为协调器。
- 协调器指定完成后,所有的消费者都需要向协调器发送心跳用来表明自己处于存活状态,如果长时间没有发送心跳,协调器会认为该节点已经死亡。
- 所有消费者都向协调器发送心跳后协调器就有了所有消费者的信息,之后群主就会通过协调器获取到所有的消费者信息后分配分区,然后把分配结果通过协调器发送给各个消费者。
4.3 消费者相关配置
- fetch.min.bytes : 指定消费者从服务器上获取记录的最小字节数。
- fetch.max.wait.ms :指定消费者获取记录的等待时间。
- max.partition.fetch.bytes:指定消费者从每个分区可以获取到的最大字节数。
- session.timeout.ms:指定协调器多长时间没有收到消费者的心跳后就会认为节点死亡。
- auto.offset.reset:当一个新消费者加入群组时,从哪个消息开始读,可以latest: 最新消息开始读,earlist: 最早消息开始读
- enable.auto.commit: 是否自动提交偏移量,默认为true,设置成false可以更好的保证重复消费和数据丢失。
4.4 偏移量提交
偏移量的作用:用于记录消费者消费到的位置,用于避免重复消费和消息丢失。
消费者提交偏移量的途径:向_consumer_offset的特殊主题发送消息。
自动提交
如果enable.auto.commit被设置为true,则每过一定时间(通过auto.commit.interval.ms来设置,默认5s),会把poll()接收到的最大偏移量提交上去。
缺点:这种方式不是实时的,所有会出现重复消费的情况,减小auto.commit.interval.ms可以降低出现的概率,但是不能完全避免。
手动同步提交
可以手动调用commitSync()方法来手动提交,这个方法会把poll()方法返回的最新偏移量提交。
如果提交失败,则会一直重试,直到成功为止。
缺点:由于是同步的,所以会阻塞,降低吞吐量。
手动异步提交
可以手动调用commitAsync()方法来手动异步提交,同样会把最新的偏移量提交,但是不会失败重试,因为会有并发问题。
缺点:存在并发问题。
手动提交的偏移量默认是poll方法返回的新大值,但是也可以手动指定,加个参数即可。
4.5 消息回溯
seek(TopicPartition, Long)方法可以用于回溯消息,该方法指定从哪开始读取kafka的消费,下次poll时会从这个点开始读取。
4.6 再均衡监听器
kafka提供了再均衡监听器的机制,可以在rebalance时获取到通知,进行一些如关闭连接等操作。
再均衡监听器只需要实现ConsumerRebalanceListener。然后要subscribe时传入参数即可
kafkaConsumer.subscribe(Collections.singletonList("test"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//此方法会在再均衡开始之前调用
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//此方法会在分区重新分配之后调用
}
});
网友评论