Ambari 上使用 Kakfa

作者: 大猪大猪 | 来源:发表于2018-09-02 20:04 被阅读7次

    以下内容是针对在Ambari上启用了Kerberos认证的情况下处理的。

    1. 首先开启ranger权限管理
      image.png
    1. 配置admin用户权限
      image.png

    控制台使用方法

    消费者

    kafka-console-consumer.sh 
    --bootstrap-server t2.demo.com:9092 
    --topic test_hello 
    --security-protocol PLAINTEXTSASL
    

    生产者

    kafka-console-producer.sh 
        --broker-list storm2.starsriver.cn:9092 
        --topic test_hello 
        --property key.separator=' ' 
        --security-protocol PLAINTEXTSASL
    

    启动消费者

    [root@t1 bin]# kafka-console-consumer.sh 
        --bootstrap-server t2.demo.com:9092 
        --topic test_hello 
        --security-protocol PLAINTEXTSASL
    hi I'm lake
    

    启动生产品

    [root@t1 bin]# kafka-console-producer.sh 
        --broker-list t2.demo.com:9092 
        --topic test_hello 
        --property key.separator=' ' 
        --security-protocol PLAINTEXTSASL
    >hi I'm lake
    >
    

    Java API 方式

    导入依赖包

    compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '1.0.1'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.0.1'
    

    kafka_jass.conf配置文件

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      doNotPrompt=true
      useTicketCache=true
      principal="admin/admin@demo.com"
      useKeyTab=true
      serviceName="kafka"
      keyTab="/etc/security/keytabs/admin.keytab"
      client=true;
    };
    

    生产者

    @Test
    public void testProducer(){
        System.setProperty("java.security.auth.login.config", "/path/kafka_jass.conf");
        System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
        System.setProperty("java.security.krb5.debug","true");
        Properties props = new Properties();
        props.put("metadata.broker.list", "t2.demo.com,t3.demo.com,t4.demo.com");
        props.put("bootstrap.servers", "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092");
        props.put("key.serializer",StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("request.required.acks", "1");
        props.put("security.protocol", "SASL_PLAINTEXT");
        KafkaProducer producer = new KafkaProducer(props);
        producer.send(new ProducerRecord<String,String>("test_hello","hi","lake"));
        producer.close();
    }
    

    消费者

    @Test
    public void testConsumer(){
        System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config","/path/kafka_jass.conf");
        Properties props = new Properties();
        props.put("group.id", "hi");
        props.put("bootstrap.servers", "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("request.required.acks", "1");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
        KafkaConsumer consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("test_hello"));
        for(;;){
            ConsumerRecords<String,String> list = consumer.poll(100);
            for(ConsumerRecord<String,String> record : list){
                System.out.println(record.key()+":"+record.value());
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:Ambari 上使用 Kakfa

        本文链接:https://www.haomeiwen.com/subject/dszmwftx.html