以下内容是针对在
Ambari
上启用了Kerberos
认证的情况下处理的。
- 首先开启
ranger
权限管理
image.png
- 配置
admin
用户权限
image.png
控制台使用方法
消费者
kafka-console-consumer.sh
--bootstrap-server t2.demo.com:9092
--topic test_hello
--security-protocol PLAINTEXTSASL
生产者
kafka-console-producer.sh
--broker-list storm2.starsriver.cn:9092
--topic test_hello
--property key.separator=' '
--security-protocol PLAINTEXTSASL
启动消费者
[root@t1 bin]# kafka-console-consumer.sh
--bootstrap-server t2.demo.com:9092
--topic test_hello
--security-protocol PLAINTEXTSASL
hi I'm lake
启动生产品
[root@t1 bin]# kafka-console-producer.sh
--broker-list t2.demo.com:9092
--topic test_hello
--property key.separator=' '
--security-protocol PLAINTEXTSASL
>hi I'm lake
>
Java API 方式
导入依赖包
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '1.0.1'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.0.1'
kafka_jass.conf
配置文件
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="admin/admin@demo.com"
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/keytabs/admin.keytab"
client=true;
};
生产者
@Test
public void testProducer(){
System.setProperty("java.security.auth.login.config", "/path/kafka_jass.conf");
System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
System.setProperty("java.security.krb5.debug","true");
Properties props = new Properties();
props.put("metadata.broker.list", "t2.demo.com,t3.demo.com,t4.demo.com");
props.put("bootstrap.servers", "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092");
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("request.required.acks", "1");
props.put("security.protocol", "SASL_PLAINTEXT");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord<String,String>("test_hello","hi","lake"));
producer.close();
}
消费者
@Test
public void testConsumer(){
System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
System.setProperty("java.security.auth.login.config","/path/kafka_jass.conf");
Properties props = new Properties();
props.put("group.id", "hi");
props.put("bootstrap.servers", "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("request.required.acks", "1");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test_hello"));
for(;;){
ConsumerRecords<String,String> list = consumer.poll(100);
for(ConsumerRecord<String,String> record : list){
System.out.println(record.key()+":"+record.value());
}
}
}
网友评论