美文网首页
实战Kafka ACL机制

实战Kafka ACL机制

作者: java菜 | 来源:发表于2018-12-03 14:53 被阅读25次

    1.概述

      在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。

    2.内容

    2.2 身份认证

      Kafka的认证范围包含如下:

    Client与Broker之间

    Broker与Broker之间

    Broker与Zookeeper之间

      当前Kafka系统支持多种认证机制,如SSL、SASL(Kerberos、PLAIN、SCRAM)。

    2.3  SSL认证流程

      在Kafka系统中,SSL协议作为认证机制默认是禁止的,如果需要使用,可以手动启动SSL机制。安装和配置SSL协议的步骤,如下所示:

    在每个Broker中Create一个Tmp密钥库

    创建CA

    给证书签名

    配置Server和Client

      执行脚本如下所示:

    1#! /bin/bash

    2# 1.Create rsa

    3keytool -keystore server.keystore.jks -aliasdn1 -validity 365 -genkey -keyalg RSA

    4# 2.Create CA

    5openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

    6# 3.Import client

    7keytool -keystore client.truststore.jks -aliasCAROOT -import -file ca-cert

    8# 4.Import server

    9keytool -keystore server.truststore.jks -aliasCAROOT -import -file ca-cert

    10# 5.Export

    11keytool -keystore server.keystore.jks -aliasdn1 -certreq -file cert-file

    12# 6.Signed

    13openssl x509 -req -CA ca-cert -CAkey ca-key -incert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456

    14# 7.Import ca-cert

    15keytool -keystore server.keystore.jks -aliasCARoot -import -file ca-cert

    16# 8.Import cert-signed

    17keytool -keystore server.keystore.jks -aliasdn1 -import -file cert-signed

    2.4 SASL认证流程

      在Kafka系统中,SASL机制包含三种,它们分别是Kerberos、PLAIN、SCRAM。以PLAIN认证为示例,下面给大家介绍PLAIN认证流程。

    2.4.1  配置Server

      首先,在$KAFKA_HOME/config目录中新建一个文件,名为kafka_server_jaas.conf,配置内容如下:

    1KafkaServer {

    2org.apache.kafka.common.security.plain.PlainLoginModule required

    3username="smartloli"

    4password="smartloli-secret"

    5user_admin="smartloli-secret";

    6};

    7

    8Client {

    9org.apache.kafka.common.security.plain.PlainLoginModule required

    10username="smartloli"

    11password="smartloli-secret";

    12};

      然后在Kafka启动脚本(kafka-server-start.sh)中添加配置文件路径,设置内容如下:

    1[hadoop@dn1 bin]$ vi kafka-server-start.sh

    2

    3# Add jaas file

    4exportKAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka/config/kafka_server_jaas.conf"

    接下来,配置server.properties文件,内容如下:

    1# Set ip & port

    2listeners=SASL_PLAINTEXT://dn1:9092

    3advertised.listeners=SASL_PLAINTEXT://dn1:9092

    4# Set protocol

    5security.inter.broker.protocol=SASL_PLAINTEXT

    6sasl.enabled.mechanisms=PLAIN

    7sasl.mechanism.inter.broker.protocol=PLAIN

    8

    9# Add acl

    10allow.everyone.if.no.acl.found=true

    11auto.create.topics.enable=false

    12delete.topic.enable=true

    13advertised.host.name=dn1

    14super.users=User:admin

    15

    16# Add class

    17authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

    2.4.2 配置Client

      当Kafka Server端配置启用了SASL/PLAIN,那么Client连接的时候需要配置认证信息,Client配置一个kafka_client_jaas.conf文件,内容如下:

    1KafkaClient {

    2org.apache.kafka.common.security.plain.PlainLoginModule required

    3username="admin"

    4password="admin-secret";

    5};

     然后,在producer.properties和consumer.properties文件中设置认证协议,内容如下:

    1security.protocol=SASL_PLAINTEXT

    2sasl.mechanism=PLAIN

    最后,在kafka-console-producer.sh脚本和kafka-console-producer.sh脚本中添加JAAS文件的路径,内容如下:

    1# For example: kafka-console-producer.sh

    2hadoop@dn1 bin]$ vi kafka-console-producer.sh

    3

    4# Add jaas file

    5exportKAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka\

    6/config/kafka_client_jaas.conf"

    2.5 ACL操作

      在配置好SASL后,启动Zookeeper集群和Kafka集群之后,就可以使用kafka-acls.sh脚本来操作ACL机制。

      (1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权新

    1[hadoop@dn1 bin]$ kafka-acls.sh --list--authorizer-properties zookeeper.connect=dn1:2181

    (2)创建:创建待授权主题之前,在kafka-acls.sh脚本中指定JAAS文件路径,然后在执行创建操作

    1[hadoop@dn1 bin]$ kafka-topics.sh --create --zookeeperdn1:2181--replication-factor1--partitions1--topic kafka_acl_topic

    (3)生产者授权:对生产者执行授权操作

    1[hadoop@dn1 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181--add--allow-principalUser:producer --operation Write --topic kafka_acl_topic

    (4)消费者授权:对生产者执行授权后,通过消费者来进行验证

    1[hadoop@dn1 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181--add--allow-principalUser:consumer --operation Read --topic kafka_acl_topic

    (5)删除:通过remove参数来回收相关权限

    1[hadoop@dn1 bin]$ kafka-acls.sh --authorizer-properties zookeeper.connect=dn1:2181--remove--allow-principal User:producer --operation Write --topic kafka_acl_topic3

    3.总结

      在处理一些核心的业务数据时,Kafka的ACL机制还是非常重要的,对核心业务主题进行权限管控,能够避免不必要的风险。

    欢迎工作一到五年的Java工程师朋友们加入Java程序员开发: 854393687

    群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

    相关文章

      网友评论

          本文标题:实战Kafka ACL机制

          本文链接:https://www.haomeiwen.com/subject/cwyycqtx.html