当开启kerberos后, java连接kafka时需要调整部分参数, 详细见样例
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import java.util.Arrays;
import java.util.Properties;
public class App {
public void produce() {
System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:/krb5.conf");
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xn141:6667,xn142:6667,xn143:6667");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
TestCallback callback = new TestCallback();
String topic = "xn-test";
for (int i = 0; i < 5; i++) {
System.out.println("--------------" + i);
producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), "MSG-" + i), callback);
}
producer.close();
}
public void consume() {
System.setProperty("java.security.auth.login.config", "D:/kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf", "D:/krb5.conf");
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xn141:6667,xn142:6667,xn143:6667");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cc1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("xn-test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for(ConsumerRecord<String, String> record: records) {
System.out.printf("%s-%s-%s\n", record.partition(), record.offset(), record.value());
}
}
}
public static void main(String[] args) {
App app = new App();
app.produce();
// app.consume();
}
public static class TestCallback implements Callback {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error...");
e.printStackTrace();
} else {
String message = String.format("sent message to topic:%s partition:%s offset:%s",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);
}
}
}
}
网友评论