美文网首页
idea编写kafka生产者

idea编写kafka生产者

作者: 我爱阿桑 | 来源:发表于2023-05-10 21:45 被阅读0次

kafka的基础demo

import java.util.Properties;
import org.apache.kafka.clients.producer.*;

public class KafkaProducerExample {

   public static void main(String[] args) throws Exception{

      // 配置Kafka生产者
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");  // acks参数对消息持久化的影响?
      props.put("retries", 0); //重试次数
      props.put("batch.size", 16384); // 批量发送的数据大小
      props.put("linger.ms", 1);  // 发送延迟(默认是0,有消息就发)
      props.put("buffer.memory", 33554432); 
     //  kafka是把消息先放到本地内存中,很多很多个消息缓存成一个batch,
     // 再发送到Broker上去,这样性能才高
     //  buffer.memory 本质就是约束kafka Producer能够使用的内存缓冲的大下的
     //  默认是32M
   
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      // 创建Kafka生产者
      Producer<String, String> producer = new KafkaProducer<>(props);

      // 发送消息到Kafka
      String topic = "test-topic";
      String key = "key1";
      String value = "value1";
      producer.send(new ProducerRecord<>(topic, key, value));

      // 关闭Kafka生产者
      producer.close();
   }
}

acks参数对消息持久化的影响?

  • ack : 0 生产者只要发送就是成功
  • ack : 1 只要leader接受到消息,并写入成功,不管Follwer同步没同步
  • ack all 不仅leader写入成功了,ISR里和leader保持同步的也写入成功了才算完成
    设置为all就一定会成功吗
  • 如果只有这个节点,leader死机了,也会失败
  • 所以只有2个以上的副本配合使用时,这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失

kafka 的 buffer.memory 与batch.size参数的区别

  • buffer.memory设置太小,会导致生产者缓存消息的数量太小,发送频繁,导致发送受阻
  • batch.size太小,会导致频繁发送,增加网络开销

kakfa入门demo

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class xync {
    public static void main(String[] args) {
//        新建对象
        Properties properties = new Properties();
        properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
        properties.put("acks","all");
        properties.put("retries",1);
        properties.put("batch.size",16384);
        properties.put("linger.ms",1);
        properties.put("buffer.memory",33554432);
        properties.put("enable.idempotence","true");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         Producer<String,String> kafkaProducer = new KafkaProducer<>(properties);
//        操作集群
        for (int i = 0; i <100 ; i++) {
            kafkaProducer.send(new ProducerRecord("first",Integer.toString(i),"hello"+i),
                    new Callback(){
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if(recordMetadata != null){
                                System.out.println("发送成功了,发到了"+recordMetadata.topic()+"第"+
                                        recordMetadata.partition()+"分区第"+recordMetadata.offset()+"消息");
                            }
                        }
                    });
        }
//        关闭资源
        kafkaProducer.close();
    }
}

相关文章

网友评论

      本文标题:idea编写kafka生产者

      本文链接:https://www.haomeiwen.com/subject/etpjsdtx.html