美文网首页
Kafka KRaft模式下SASL鉴权配置

Kafka KRaft模式下SASL鉴权配置

作者: ShootHzj | 来源:发表于2021-07-06 08:24 被阅读0次

首先配置KRaft模式

./bin/kafka-storage.sh random-uuid
./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties

服务端properties配置

kraft/server.properties

listeners=SASL_PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://localhost:9092
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

服务端jaasconfig 配置

config目录下新建jaas.config

KafkaServer {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="admin"
   password="password";
   user_alice="pwd"
};

这里的user_alice="pwd"代表有一个用户,名为alice,密码为pwd

然后

export KAFKA_OPTS=-Djava.security.auth.login.config=$KAFKA_HOME/config/jaas.conf

客户端代码连接

客户端使用alicepwd连接上来

package com.github.shoothzj.demo.kafka;

import com.github.shoothzj.javatool.util.CommonUtil;
import com.github.shoothzj.javatool.util.LogUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author hezhangjian
 */
@Slf4j
public class KafkaConsumerSaslTest {

    @Test
    public void kafkaConsumerSasl() {
        LogUtil.configureLog();
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConst.BROKERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConst.OFFSET_RESET_LATEST);

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        String saslJaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"%s\" \npassword=\"%s\";", "alice", "pwd");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-sasl"));
        CommonUtil.sleep(TimeUnit.SECONDS, 5);
        for (int i = 0; i < 5; i++) {
            final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                log.info("consumer record {} key {}", consumerRecord.offset(), consumerRecord.key());
            }
            CommonUtil.sleep(TimeUnit.SECONDS, 10);
        }
        consumer.close();
    }

}

成功完成SASL连接

代码地址

https://github.com/Shoothzj/maven-demo/blob/58409a4ad4de76c7458267a1fda1895d60a2ec95/demo-kafka/src/test/java/com/github/shoothzj/demo/kafka/KafkaConsumerSaslTest.java

相关文章

网友评论

      本文标题:Kafka KRaft模式下SASL鉴权配置

      本文链接:https://www.haomeiwen.com/subject/ocwwultx.html