美文网首页
kafka原生 API演示

kafka原生 API演示

作者: 小波同学 | 来源:发表于2019-01-11 21:01 被阅读11次

    人生格言:不敢冒险,才是风险!

    在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f3b

    一、 pom.xml文件,引入依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    二、编写Producer

    1.Producer 配置

    Properties properties = new Properties();
    //kafka server的地址,集群配多个,中间,逗号隔开
    properties.put("bootstrap.servers","localhost:9092");
    //写入kafka时,leader负责一个该partion读写,
    // 当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack
    properties.put("acks", "all");
    //写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
    // 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
    properties.put("retries", 0);
    //produce积累到一定数据,一次发送
    properties.put("batch.size", 16384);
    //当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?
    // 当消息超过linger时间,也会发送
    properties.put("linger.ms", 1);
    //produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
    properties.put("buffer.memory", 33554432);
    //key和value的序列化类
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    • bootstrap.servers:kafka server的地址

    • acks:写入kafka时,leader负责一个该partion读写,当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack。

    • retris:写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。

    • batch.size:produce积累到一定数据,一次发送。

    • buffer.memory: produce积累数据一次发送,缓存大小达到buffer.memory就发送数据。

    • linger.ms :当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?当消息超过linger时间,也会发送。

    • key/value serializer:序列化类。

    2.KafkaProducer

    KafkaProducer kafkaProducer = new KafkaProducer(properties);
    
    ProducerRecord producerRecord = new ProducerRecord("yibo",0,"message","value");
    Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
    RecordMetadata recordMetadata = future.get();
    System.out.println(recordMetadata);
    
    • Producer是一个接口,声明了同步send和异步send两个重要方法。

    • ProducerRecord 消息实体类,每条消息由(topic,key,value,timestamp)四元组封装。一条消息key可以为空和timestamp可以设置当前时间为默认值。

    三、编写Consumer

    A、自动确认Offset方式:props.put("enable.auto.commit", "true");

    1.Consumer 配置

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    //group.id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
    props.put("group.id", "testGroup");
    //enable.auto.commit:true --> 设置自动提交offset
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    • group.id:testGroup。由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名。

    • enable.auto.commit:true。设置自动提交offset。

    2.KafkaConsumer

    KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
    kafkaConsumer.subscribe(Arrays.asList("yibo"));
    while(true){
        //读取数据,读取超时时间为100ms
        ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("offset = ["+record.offset()+"], key = ["+record.key()+"], value = ["+record.value()+"]");
        }
    }
    

    B、手动确认Offset:props.put("enable.auto.commit", "false");

    1.Consumer 配置

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    //group.id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
    props.put("group.id", "testGroup");
    //enable.auto.commit:true --> 设置手动提交offset
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    

    2.KafkaConsumer

    KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
    kafkaConsumer.subscribe(Arrays.asList("yibo"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while(true){
        //读取数据,读取超时时间为100ms
    
        ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("offset = ["+record.offset()+"], key = ["+record.key()+"], value = ["+record.value()+"]");
        }
    
        /**
         * 数据达到批量要求,就写入DB,同步确认offset
         */
        if (buffer.size() >= minBatchSize) {
            //insertIntoDb(buffer);//这里为插入数据库代码
            kafkaConsumer.commitSync();
            buffer.clear();
        }
    }
    

    kafka原生 API演示完成,本文测试了kafka提供的Api。
    在实际应用中kafka会和spark stream结合,采用流式计算的方式处理kafka中数据。

    参考:
    Kafka Consumer API样例:https://blog.csdn.net/xianzhen376/article/details/51167742/

    Kafka API实践:https://www.jianshu.com/p/ef0fd82b57ab

    Spring Kafka中关于Kafka的配置参数:https://blog.csdn.net/fenglibing/article/details/82117166

    相关文章

      网友评论

          本文标题:kafka原生 API演示

          本文链接:https://www.haomeiwen.com/subject/vjsfdqtx.html