美文网首页
kafka相关配置和JavaAPI

kafka相关配置和JavaAPI

作者: 傻疯子 | 来源:发表于2021-12-10 23:54 被阅读0次
    配置

    server.properties是配置文件

    具体文档可以查看https://kafka.apache.org/24/documentation.html

    数据清空策略
    log.flush.interval.messages分区消息达到设置数后清空到磁盘上
    log.flush.interval.ms间隔时间数据清空到磁盘上

    数据保存策略,达到条件后就会删除
    log.retention.hours数据保存时间
    log.retention.bytes数据保存大小
    log.retention.check.interval.ms数据保存条件检查间隔时间

    生产者数据通讯策略
    acks为1,得到leader收到的消息后,发送下一条
    acks为all,得到所有节点收到的消息后,发送下一条
    acks为0,不需要回复,直接发送
    acks为all,可以保存数据不丢

    同一个group_id消费者数量如果大于分区数,则多余消费者无法得到消息。少于则一个消费者会消费多个分区的数据。

    数据Broker新增后,负载均衡开启和检查时间
    auto.leader.rebalance.enable=true
    leader.imbalance.check.interval.seconds
    手动执行负载均衡命令
    bin/kafka-leader-election.sh --bootstrap-server broker_ip:port --election-type preferred --all-topic-partitions

    JavaAPI

    生产者

          Properties prop = new Properties();
          //指定kafka的broker地址
          prop.put("bootstrap.servers", "broker_ip:port");
          //指定key-value数据的序列化格式
          prop.put("key.serializer", StringSerializer.class.getName());
          prop.put("value.serializer", StringSerializer.class.getName());
    
          //指定topic
          String topic = "kafka_topic"; 
          
          //创建kafka生产者
          KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
    
          //向topic中生产数据
          producer.send(new ProducerRecord<String, String>(topic, "hello kafka"));
    
          //关闭链接
          producer.close();
    

    消费者

          Properties prop = new Properties();
          //指定kafka的broker地址
          prop.put("bootstrap.servers", "broker_ip:port");
          //指定key-value的反序列化类型
          prop.put("key.deserializer", StringDeserializer.class.getName());
          prop.put("value.deserializer", StringDeserializer.class.getName());
          //指定消费者组
          prop.put("group.id", "test");
          //latest表示找不到offset或offset对应的数据就消费最新的,earliest消费最早的,none找不到之前offset抛出异常
          prop.put("auto.offset.reset","latest");
    
          //创建消费者
          KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
          Collection<String> topics = new ArrayList<String>();
          topics.add("kafka_topic");
          //订阅指定的topic
          consumer.subscribe(topics);
    
          while(true) {
             ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
             for (ConsumerRecord<String,String> consumerRecord : poll) {
                System.out.println(consumerRecord);
             }
          }
    

    相关文章

      网友评论

          本文标题:kafka相关配置和JavaAPI

          本文链接:https://www.haomeiwen.com/subject/ybihfrtx.html