美文网首页
Kafka SASL 安全认证

Kafka SASL 安全认证

作者: watermark | 来源:发表于2020-10-19 23:27 被阅读0次

    服务端

    • 在服务器节点配置认证文件:
      文件路径:kafka/config/kafka_server_jaas.conf
      文件内容:

      KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="admin"
      password="admin"
      user_admin="admin"
      user_rex="123456"
      user_alice="123456"
      user_lucy="123456";
      };
      

      注意配置文件中的两个分号的位置,多一不可,缺一不可。

    • 修改服务器节点的启动配置文件:
      复制 kafka/config/server.propertieskafka/config/server-sasl.properties,在文件最末尾添加:

      listeners=SASL_PLAINTEXT://localhost:9092
      security.inter.broker.protocol=SASL_PLAINTEXT
      sasl.enabled.mechanisms=PLAIN
      sasl.mechanism.inter.broker.protocol=PLAIN
      authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
      super.users=User:admin
      
    • 修改服务器节点的启动脚本:
      复制 kafka/bin/kafka-server-start.shkafka/bin/kafka-server-start-sasl.sh,将认证信息配置到 kafka 服务器节点的 JVM 启动参数中:

      if [ "x$KAFKA_OPTS" ]; then
          export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_server_jaas.conf"
      fi
      
    • 启动 zookeeper:./bin/zookeeper-server-start.sh config/zookeeper.properties

    • 以安全认证的方式启动 kafka-server:./bin/kafka-server-start-sasl.sh config/server-sasl.properties

    • 创建一个 topic:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    生产者 + 消费者

    • 配置 生产者/消费者 的认证信息:
      文件路径:kafka/config/kafka_client_jaas.conf
      文件内容:

      KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="admin"
      password="admin";
      };
      

      注意配置文件中的两个分号的位置,多一不可,缺一不可。

    • 修改 生产者 的启动配置文件:
      复制 kafka/config/producer.propertieskafka/config/producer-sasl.properties
      在文件最末尾添加:

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      
    • 修改 消费者 的启动配置文件:
      复制 kafka/config/consumer.propertieskafka/config/consumer-sasl.properties
      在文件最末尾添加:

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      
    • 修改生产者的启动脚本:
      复制 kafka/bin/kafka-console-producer.shkafka/bin/kafka-console-producer-sasl.sh,将认证信息配置到 kafka 生产者的 JVM 启动参数中:

      if [ "x$KAFKA_OPTS" ]; then
        export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_client_jaas.conf"
      fi
      
    • 修改消费者的启动脚本:
      复制 kafka/bin/kafka-console-consumer.shkafka/bin/kafka-console-consumer-sasl.sh,将认证信息配置到 kafka 生产者的 JVM 启动参数中:

      if [ "x$KAFKA_OPTS" ]; then
        export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_client_jaas.conf"
      fi
      
    • 以安全认证的方式启动 kafka-producer:./bin/kafka-console-producer-sasl.sh --broker-list localhost:9092 --topic test --producer.config config/producer-sasl.properties
      发送消息测试:

      生产者发送消息测试.png
    • 以安全认证的方式启动 kafka-consumer:./bin/kafka-console-consumer-sasl.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-sasl.properties

      消费者接收消息.png

    java client

    • java client 中添加 SASL 设置信息:

      Java client consumer properties配置.png

      注意 sasl.jaas.config 配置中的分号必不可少。

      package kafka;
      
      import java.time.Duration;
      import java.util.Arrays;
      import java.util.Properties;
      
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class TestKafkaSasl {
      
          private static final Logger logger = LoggerFactory.getLogger(TestKafkaSasl.class);
          
          public static void main(String[] args) {
              Properties props = new Properties();
              props.put("bootstrap.servers", "localhost:9092");
              props.put("group.id", "test_group");
              props.put("enable.auto.commit", "true");
              props.put("auto.commit.interval.ms", "1000");
              props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              // sasl.jaas.config的配置, 结尾分号必不可少.
              props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
              props.setProperty("security.protocol", "SASL_PLAINTEXT");
              props.setProperty("sasl.mechanism", "PLAIN");
      
              @SuppressWarnings("resource")
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
              consumer.subscribe(Arrays.asList("test"));
              while (true) {
                  try {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                      for (ConsumerRecord<String, String> record : records) {
                          System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n",
                                record.offset(), record.partition(), record.key(), record.value());
                          logger.info("offset = {}, partition = {}, key = {}, value = {}",
                                record.offset(), record.partition(), record.key(), record.value());
                      }
                  } catch (Exception e) {
                      e.printStackTrace();
                      logger.error(e.getMessage());
                  }
              }
          }
      }
      

      运行测试:

      继续生产数据.png java client sasl消费数据测试.png

    可能遇到的问题

    • 生产者进程启动报错

      生产者启动报错.png
      解决方案:以安全认证的方式启动 kafka-producer,别忘记添加设置配置文件参数。 ./bin/kafka-console-producer-sasl.sh --broker-list localhost:9092 --topic test --producer.config config/producer-sasl.properties
    • 消费者进程启动报错

      消费者启动报错.png
      解决方案:以安全认证的方式启动 kafka-consumer,别忘记添加设置配置文件参数。 ./bin/kafka-console-consumer-sasl.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-sasl.properties

    相关文章

      网友评论

          本文标题:Kafka SASL 安全认证

          本文链接:https://www.haomeiwen.com/subject/jbvlmktx.html