美文网首页
java往有账号密码验证的Kafka生产数据

java往有账号密码验证的Kafka生产数据

作者: __元昊__ | 来源:发表于2020-07-17 10:13 被阅读0次
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
    </dependency>
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Optional;
    import java.util.Properties;
    
    public class KafkaUtils {
        public static void kafkaProducer(String topic, String username, String password) {
            String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
            String jaasCfg = String.format(jaasTemplate, username, password);
    
            Properties props = new Properties();
            props.put("security.protocol","SASL_PLAINTEXT");
            props.put("sasl.mechanism","SCRAM-SHA-256");
            props.put("sasl.jaas.config", jaasCfg);
            props.put("bootstrap.servers",
                    "10.221.124.13:9092,10.221.124.14:9092,10.221.124.15:9092");//该地址是集群的子集,用来探测集群。
            props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
            props.put("retries", 3);// 请求失败重试的次数
            props.put("batch.size", 16384);// batch的大小
            props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
            props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<String,String> kafkaProducer = null;
            try {
                kafkaProducer = new KafkaProducer<>(props);
                kafkaProducer.send(new ProducerRecord<String, String>(topic, "1", "1"));
                System.out.println("发送成功");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                Optional.ofNullable(kafkaProducer).ifPresent(KafkaProducer::close);
            }
        }
    
        public static void main(String[] args) {
            kafkaProducer("topic","username","password");
        }
    }
    

    相关文章 https://blog.csdn.net/u012842205/article/details/73188534

    相关文章https://github.com/CloudKarafka/java-kafka-example/blob/master/src/main/java/KafkaExample.java

    相关文章

      网友评论

          本文标题:java往有账号密码验证的Kafka生产数据

          本文链接:https://www.haomeiwen.com/subject/uawwhktx.html