美文网首页
Java连接kafka + kerberos

Java连接kafka + kerberos

作者: halfempty | 来源:发表于2021-11-30 10:33 被阅读0次

    当开启kerberos后, java连接kafka时需要调整部分参数, 详细见样例

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class App {
    
        public void produce() {
            System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
            System.setProperty("java.security.krb5.conf", "D:/krb5.conf");
    
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xn141:6667,xn142:6667,xn143:6667");
            properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
            TestCallback callback = new TestCallback();
    
            String topic = "xn-test";
            for (int i = 0; i < 5; i++) {
                System.out.println("--------------" + i);
                producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), "MSG-" + i), callback);
            }
            producer.close();
        }
    
        public void consume() {
            System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
            System.setProperty("java.security.krb5.conf", "D:/krb5.conf");
    
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xn141:6667,xn142:6667,xn143:6667");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "cc1");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.kerberos.service.name", "kafka");
            props.put("sasl.mechanism", "GSSAPI");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("xn-test"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(10);
                for(ConsumerRecord<String, String> record: records) {
                    System.out.printf("%s-%s-%s\n", record.partition(), record.offset(), record.value());
                }
            }
        }
    
        public static void main(String[] args) {
            App app = new App();
            app.produce();
    //        app.consume();
        }
    
        public static class TestCallback implements Callback {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    System.out.println("Error...");
                    e.printStackTrace();
                } else {
                    String message = String.format("sent message to topic:%s partition:%s  offset:%s",
                            recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                    System.out.println(message);
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Java连接kafka + kerberos

          本文链接:https://www.haomeiwen.com/subject/pkqetrtx.html