美文网首页
idea编写kafka生产者

idea编写kafka生产者

作者: 我爱阿桑 | 来源:发表于2023-05-10 21:45 被阅读0次

    kafka的基础demo

    import java.util.Properties;
    import org.apache.kafka.clients.producer.*;
    
    public class KafkaProducerExample {
    
       public static void main(String[] args) throws Exception{
    
          // 配置Kafka生产者
          Properties props = new Properties();
          props.put("bootstrap.servers", "localhost:9092");
          props.put("acks", "all");  // acks参数对消息持久化的影响?
          props.put("retries", 0); //重试次数
          props.put("batch.size", 16384); // 批量发送的数据大小
          props.put("linger.ms", 1);  // 发送延迟(默认是0,有消息就发)
          props.put("buffer.memory", 33554432); 
         //  kafka是把消息先放到本地内存中,很多很多个消息缓存成一个batch,
         // 再发送到Broker上去,这样性能才高
         //  buffer.memory 本质就是约束kafka Producer能够使用的内存缓冲的大下的
         //  默认是32M
       
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
          // 创建Kafka生产者
          Producer<String, String> producer = new KafkaProducer<>(props);
    
          // 发送消息到Kafka
          String topic = "test-topic";
          String key = "key1";
          String value = "value1";
          producer.send(new ProducerRecord<>(topic, key, value));
    
          // 关闭Kafka生产者
          producer.close();
       }
    }
    

    acks参数对消息持久化的影响?

    • ack : 0 生产者只要发送就是成功
    • ack : 1 只要leader接受到消息,并写入成功,不管Follwer同步没同步
    • ack all 不仅leader写入成功了,ISR里和leader保持同步的也写入成功了才算完成
      设置为all就一定会成功吗
    • 如果只有这个节点,leader死机了,也会失败
    • 所以只有2个以上的副本配合使用时,这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失

    kafka 的 buffer.memory 与batch.size参数的区别

    • buffer.memory设置太小,会导致生产者缓存消息的数量太小,发送频繁,导致发送受阻
    • batch.size太小,会导致频繁发送,增加网络开销

    kakfa入门demo

    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class xync {
        public static void main(String[] args) {
    //        新建对象
            Properties properties = new Properties();
            properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
            properties.put("acks","all");
            properties.put("retries",1);
            properties.put("batch.size",16384);
            properties.put("linger.ms",1);
            properties.put("buffer.memory",33554432);
            properties.put("enable.idempotence","true");
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
             Producer<String,String> kafkaProducer = new KafkaProducer<>(properties);
    //        操作集群
            for (int i = 0; i <100 ; i++) {
                kafkaProducer.send(new ProducerRecord("first",Integer.toString(i),"hello"+i),
                        new Callback(){
                            @Override
                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                if(recordMetadata != null){
                                    System.out.println("发送成功了,发到了"+recordMetadata.topic()+"第"+
                                            recordMetadata.partition()+"分区第"+recordMetadata.offset()+"消息");
                                }
                            }
                        });
            }
    //        关闭资源
            kafkaProducer.close();
        }
    }
    

    相关文章

      网友评论

          本文标题:idea编写kafka生产者

          本文链接:https://www.haomeiwen.com/subject/etpjsdtx.html