美文网首页
kafka原生 API演示

kafka原生 API演示

作者: 小波同学 | 来源:发表于2019-01-11 21:01 被阅读11次

人生格言:不敢冒险,才是风险!

在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f3b

一、 pom.xml文件,引入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

二、编写Producer

1.Producer 配置

Properties properties = new Properties();
//kafka server的地址,集群配多个,中间,逗号隔开
properties.put("bootstrap.servers","localhost:9092");
//写入kafka时,leader负责一个该partion读写,
// 当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack
properties.put("acks", "all");
//写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
// 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
properties.put("retries", 0);
//produce积累到一定数据,一次发送
properties.put("batch.size", 16384);
//当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?
// 当消息超过linger时间,也会发送
properties.put("linger.ms", 1);
//produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
properties.put("buffer.memory", 33554432);
//key和value的序列化类
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • bootstrap.servers:kafka server的地址

  • acks:写入kafka时,leader负责一个该partion读写,当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack。

  • retris:写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。

  • batch.size:produce积累到一定数据,一次发送。

  • buffer.memory: produce积累数据一次发送,缓存大小达到buffer.memory就发送数据。

  • linger.ms :当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?当消息超过linger时间,也会发送。

  • key/value serializer:序列化类。

2.KafkaProducer

KafkaProducer kafkaProducer = new KafkaProducer(properties);

ProducerRecord producerRecord = new ProducerRecord("yibo",0,"message","value");
Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
RecordMetadata recordMetadata = future.get();
System.out.println(recordMetadata);
  • Producer是一个接口,声明了同步send和异步send两个重要方法。

  • ProducerRecord 消息实体类,每条消息由(topic,key,value,timestamp)四元组封装。一条消息key可以为空和timestamp可以设置当前时间为默认值。

三、编写Consumer

A、自动确认Offset方式:props.put("enable.auto.commit", "true");

1.Consumer 配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//group.id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
props.put("group.id", "testGroup");
//enable.auto.commit:true --> 设置自动提交offset
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  • group.id:testGroup。由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名。

  • enable.auto.commit:true。设置自动提交offset。

2.KafkaConsumer

KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
kafkaConsumer.subscribe(Arrays.asList("yibo"));
while(true){
    //读取数据,读取超时时间为100ms
    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("offset = ["+record.offset()+"], key = ["+record.key()+"], value = ["+record.value()+"]");
    }
}

B、手动确认Offset:props.put("enable.auto.commit", "false");

1.Consumer 配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//group.id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
props.put("group.id", "testGroup");
//enable.auto.commit:true --> 设置手动提交offset
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

2.KafkaConsumer

KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
kafkaConsumer.subscribe(Arrays.asList("yibo"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while(true){
    //读取数据,读取超时时间为100ms

    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("offset = ["+record.offset()+"], key = ["+record.key()+"], value = ["+record.value()+"]");
    }

    /**
     * 数据达到批量要求,就写入DB,同步确认offset
     */
    if (buffer.size() >= minBatchSize) {
        //insertIntoDb(buffer);//这里为插入数据库代码
        kafkaConsumer.commitSync();
        buffer.clear();
    }
}

kafka原生 API演示完成,本文测试了kafka提供的Api。
在实际应用中kafka会和spark stream结合,采用流式计算的方式处理kafka中数据。

参考:
Kafka Consumer API样例:https://blog.csdn.net/xianzhen376/article/details/51167742/

Kafka API实践:https://www.jianshu.com/p/ef0fd82b57ab

Spring Kafka中关于Kafka的配置参数:https://blog.csdn.net/fenglibing/article/details/82117166

相关文章

网友评论

      本文标题:kafka原生 API演示

      本文链接:https://www.haomeiwen.com/subject/vjsfdqtx.html