Kafka简介
-
Kafka是linkedin公司使用Scala语言编写的一款具有高水平扩展和高吞吐量的分布式消息系统
-
Kafka强依赖zookeeper,无论是Kafka集群,生产者(producer)和消费者(consumer)都依赖于zookeeper(zk)来保证系统可用性
-
支持数据存储、集群与zk动态扩容,但仅支持仿AMQP协议
kafka架构
相关概念
-
Broker
一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic; -
Producer
消息生产者,就是向kafka broker发消息的客户端 -
Consumer
消息消费者,向kafka broker取消息的客户端 -
Topic(消息队列)
是数据主题,是kafka中用来代表一个数据流的抽象。发布数据时,可以用topic对数据进行分类,也作为订阅数据时的主题。一个topic同时可有多个producer、consumer -
Consumer Group (CG)
kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段,一个topic可以有多个CG
-
Partition(数据分片)
每个partition是一个顺序的、不可变的record序列,partition中的record被分配一个自增长id(offset) -
Offset
Partition中自增长id序列
分片:将一份完整的数据拆分成多个,分开存储;提高并发度,负载均衡;降低丢失数据的风险;
副本:将数据复制一份出来存储,提高读取并发度(降低写入性能),提高数据可用性,主要用来解决服务器故障导致的数据丢失问题
Leader:主副本,数据读写
Follower:从副本,从主副本同步过来的数据
-
Record (记录)
每条记录都有key、value、timestamp、offset 信息
key时用来处理数据分发,分发规则类似 hash(key) % partition_count ;key为null则轮询partition,key固定一个则只会将数据发送到一个partition中
Kafka 主要参数
公共参数
-
bootstrap.servers
kafka服务器连接地址 -
key.serializer
key 序列化接口配置;如:org.apache.kafka.common.serialization.Serializer -
value.serialize
value序列化接口配置;可以和key.serializer相同,也可以不同,只要消费端保持一致
Producer API
-
buffer.memory
用于设置缓存消息的缓冲区大小,单位是字节,默认值是 33554432,即 32MB -
batch.size
发往同一分区的多条消息封装的批次大小,默认16k,控制发送数据的吞吐量 -
max.request.size
发送单条最大消息大小,默认的 1048576 字节(1MB) -
linger.ms
缓冲区中的数据在达到batch.size前,需要等待的时间,默认值0 -
acks
用来配置请求成功的标准,默认1;acks分别是0、1、all
0: 数据发送出去
1: 数据发送出去,并存储在Leader(主副本)
all: 数据发送出去,并存储在Leader(主副本)并同步所有Follower(从副本)
-
retries
发送消息失败重试的次数,默认值是 0,表示不进行重试(重试可能造成消息的重复发送、消息的乱序)
-
request.timeout.ms
发送消息响应时间,默认30 秒,超过时间无响应抛TimeoutException异常
Consumer API
-
group.id
消费组,同一份数据可以由多个消费组来消费 -
auto.offset. reset
数据消费的位置
earliest:指定从最早的位移开始消费
latest:指定从最新处位移开始消费
none :指定如果未发现位移信息或位移越界,则抛出异常 -
enable.auto.commit
offset提交方式,用来提交位移;ture自动提交,false手动提交 -
session.timeout.ms
消费者组协调者(group coordinator) 检测消费组(group)失败的时间,导致不必要的 rebalance(数据平衡);默认10 秒 -
max.poll.interval.ms
设置消息处理逻辑的最大时间,超过则会对consumer 崩溃检测,做rebalance(数据平衡) -
max.partition.fetch.bytes
指定了服务器从每个分区里返回给消费者的最大字节数,默认值是lMB -
fetch.max.bytes
consumer 端单次获取数据的最大字节数,单次超过则无法消费 -
max.poll.records
单次 poll 调用返回的最大消息数;默认的 500 条 -
heartbeat.interval.ms
consumer 间的心跳检测,做数据平衡用 -
connections.max.idle.ms
关闭空闲 Socket 连接时间;当前默认值是 9 分钟;不关闭设置-1
Consumer相关
1. 客户端一定要配置kafka服务端的hosts,即使你使用的是IP来访问的
2. Commit Offset的作用是为了将最新消费的位置保留在服务端.
很多人认为,Commit offset的作用是让consumer在下次消费时,知道该从哪里继续消费,这种想法是错误的。commit offset只作为group下次启动时消费的起始位置
-
自动 Offset Commite
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.90.131:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
System.out.println("getting...");
//每次poll时,会将上次的get到的数据commit offset
ConsumerRecords<String, String> records = consumer.poll(8000);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
自动提交offset,具体是什么时候提交呢?
consumer关闭 或 下次poll时,会将上次poll的消息的offset commit
可能导致重复消费:
成功消费一批消息后,commit offset前 consumer程序挂了,可能导致重复消费
-
手动 Offset Commite
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.90.131:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
final int minBatchSize = 20;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
System.out.println("getting...");
ConsumerRecords<String, String> records = consumer.poll(4000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("received msg:" + record.value());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertToDB(buffer); //打印出内容
System.out.println("======》》》offset commit。。。");
consumer.commitSync(); //手动提交offset
buffer.clear();
}
}
在这种场景下,使用手动提交,还可能导致消息遗漏:在数据插入DB之后而宕机,程序下次启动时就会从上次commit的offset开始消费所以这种场景下,kafka提供的消费语义是at-least-once;需要采用事物来控制commit或在外部存储offset;
3. 外部存储offset
producer不是一定要将offset保存在Kafka中的,他们可以在自己的存储中保留offset,这样做可以允许程序在同一系统中落地数据并存储offset;
1、关闭自动提交enable.auto.commit=false
2、存储ConsumerRecord中提供的offset
3、重启时,使用seek方法恢复consumer的消费位置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.90.131:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
//kafka的分区逻辑是在poll方法里执行的,所以执行seek方法之前先执行一次poll方法
//获取当前消费者消费分区的情况
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
//如果没有分配到分区,就一直循环下去
kafkaConsumer.poll(100L);
assignment = consumer.assignment();
}
for (TopicPartition tp : assignment) {
//消费第当前分区的offset为10的消息
kafkaConsumer.seek(tp, 10);
}
while (true) {
System.out.println("getting...");
ConsumerRecords<String, String> records = consumer.poll(4000);
System.out.println("拉取的消息数量:" + records .count());
System.out.println("消息集合是否为空:" + records .isEmpty());
}
Spring集成Kafka
- 依赖包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
- 生产者Producer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
@Component
public class Producer {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
/*
//SASL Plaintest 权限
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"123456\";")
*/
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Autowired
private KafkaTemplate<String, String> template;
public void send(String data) {
this.template.send("test_topic", data);
}
}
- 消费者
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
@Configuration
@EnableKafka
public class TestConsumerConfig {
@Value("${kafka.test-consumer.username}")
private String username;
@Value("${kafka.test-consumer.password}")
private String password;
@Bean
public KafkaListenerContainerFactory<?> testFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId)));
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(15000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
private Map<String, Object> consumerConfigs(String groupId) {
Map<String, Object> propsMap = new HashMap<>(20);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test_01");
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES * 10);
/*
//SASL Plaintest 权限
String config = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",username,password);
propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
propsMap.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
propsMap.put("sasl.jaas.config",config);
*/
return propsMap;
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class Consumer{
@KafkaListener(topics = "test_topic", containerFactory = "testFactory")
public void handMessage(List<ConsumerRecord<?, ?>> recordList, Acknowledgment ack) {
try{
for (ConsumerRecord<?, ?> record : recordList) {
......
}
ack.acknowledge();
}catch (Exception e){
......
}
}
}
Kafka 限流
kafka客户端是认证的,那么可以使用userId和clientId两种认证方式。如果没有认证只能使用clientId限流
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=1024' --entity-type clients --entity-name clientA
对clientId=clientA的客户端添加限流设置。producer_byte_rate表示每秒最多能写入到消息量,单位为byte/sec。consumer_byte_rate表示每秒最多能消费的消息了,单位也为byte/sec。设置后立即生效。
Producer & Consumer设置
// Producer:
props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientA");
// Consumer:
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "clientA");
查看用户流量限制
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name clientA
输出结果显示如下:
Configs for user-principal ‘clientA’ are producer_byte_rate=1024,consumer_byte_rate=2048
Kafka Tool 工具
-
下载地址
http://www.kafkatool.com/download.html
-
注意事项
- 安装目录最好不要有带空格的文件路径下,在带参数启动时会报参数命令无效
- hosts 文件中,添加入 kafka 的集群域名,是要重启KafkaTools才会生效
- 在多集群下,bootstrap servers如果设置了,必须把所有的地址设置上
-
SASL加密连接
- 客户端 jass.conf 配置文件
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="123456";
};
- 新版本解决需要配置文件的问题
网友评论