人生格言:不敢冒险,才是风险!
在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f3b
一、 pom.xml文件,引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
二、编写Producer
1.Producer 配置
Properties properties = new Properties();
//kafka server的地址,集群配多个,中间,逗号隔开
properties.put("bootstrap.servers","localhost:9092");
//写入kafka时,leader负责一个该partion读写,
// 当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack
properties.put("acks", "all");
//写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
// 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
properties.put("retries", 0);
//produce积累到一定数据,一次发送
properties.put("batch.size", 16384);
//当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?
// 当消息超过linger时间,也会发送
properties.put("linger.ms", 1);
//produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
properties.put("buffer.memory", 33554432);
//key和value的序列化类
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
bootstrap.servers:kafka server的地址
-
acks:写入kafka时,leader负责一个该partion读写,当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack。
-
retris:写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
-
batch.size:produce积累到一定数据,一次发送。
-
buffer.memory: produce积累数据一次发送,缓存大小达到buffer.memory就发送数据。
-
linger.ms :当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?当消息超过linger时间,也会发送。
-
key/value serializer:序列化类。
2.KafkaProducer
KafkaProducer kafkaProducer = new KafkaProducer(properties);
ProducerRecord producerRecord = new ProducerRecord("yibo",0,"message","value");
Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
RecordMetadata recordMetadata = future.get();
System.out.println(recordMetadata);
-
Producer是一个接口,声明了同步send和异步send两个重要方法。
-
ProducerRecord 消息实体类,每条消息由(topic,key,value,timestamp)四元组封装。一条消息key可以为空和timestamp可以设置当前时间为默认值。
三、编写Consumer
A、自动确认Offset方式:props.put("enable.auto.commit", "true");
1.Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//group.id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
props.put("group.id", "testGroup");
//enable.auto.commit:true --> 设置自动提交offset
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
group.id:testGroup。由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名。
-
enable.auto.commit:true。设置自动提交offset。
2.KafkaConsumer
KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
kafkaConsumer.subscribe(Arrays.asList("yibo"));
while(true){
//读取数据,读取超时时间为100ms
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset = ["+record.offset()+"], key = ["+record.key()+"], value = ["+record.value()+"]");
}
}
B、手动确认Offset:props.put("enable.auto.commit", "false");
1.Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//group.id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
props.put("group.id", "testGroup");
//enable.auto.commit:true --> 设置手动提交offset
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
2.KafkaConsumer
KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
kafkaConsumer.subscribe(Arrays.asList("yibo"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while(true){
//读取数据,读取超时时间为100ms
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset = ["+record.offset()+"], key = ["+record.key()+"], value = ["+record.value()+"]");
}
/**
* 数据达到批量要求,就写入DB,同步确认offset
*/
if (buffer.size() >= minBatchSize) {
//insertIntoDb(buffer);//这里为插入数据库代码
kafkaConsumer.commitSync();
buffer.clear();
}
}
kafka原生 API演示完成,本文测试了kafka提供的Api。
在实际应用中kafka会和spark stream结合,采用流式计算的方式处理kafka中数据。
参考:
Kafka Consumer API样例:https://blog.csdn.net/xianzhen376/article/details/51167742/
Kafka API实践:https://www.jianshu.com/p/ef0fd82b57ab
Spring Kafka中关于Kafka的配置参数:https://blog.csdn.net/fenglibing/article/details/82117166
网友评论