美文网首页大数据
kafka权限控制

kafka权限控制

作者: 无色的叶 | 来源:发表于2019-06-24 15:35 被阅读12次

    转发文章--https://www.jianshu.com/p/011567554f0f

    SASL/SCRAM+ACL实现动态创建用户及权限控制

    本篇文档中使用的是自己部署的zookeeper, zookeeper无需做任何特殊配置

    使用SASL / SCRAM进行身份验证

    请先在不配置任何身份验证的情况下启动Kafka

    1. 创建SCRAM Credentials

    Kafka中的SCRAM实现使用Zookeeper作为凭证(credential)存储。 可以使用kafka-configs.sh在Zookeeper中创建凭据。 对于启用的每个SCRAM机制,必须通过添加具有机制名称的配置来创建凭证。 必须在启动Kafka broker之前创建代理间通信的凭据。 可以动态创建和更新客户端凭证,并使用更新的凭证来验证新连接。

    创建broker建通信用户(或称超级用户)

    bin/kafka-configs.sh --zookeeper node1:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
    
    

    创建客户端用户fanboshi

    bin/kafka-configs.sh --zookeeper node1:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=fanboshi],SCRAM-SHA-512=[password=fanboshi]' --entity-type users --entity-name fanboshi
    
    

    查看SCRAM证书

    [root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name fanboshi
    Configs for user-principal 'fanboshi' are SCRAM-SHA-512=salt=MWwwdWJqcjBncmUwdzY1Mzdoa2NwNXppd3A=,stored_key=mGCJy5k3LrE2gs6Dp4ALRhgy37l1WYPUIdoOncCF+B3Ti3wL2sQNmzg8oEz3tUs9DFsclFCygjbysb0S0BU9bA==,server_key=iTyX0U0Jt02dkddUm6QrVwNf3lJk72dBNs9EDHTqe8kLlNGIp9ypzRkcgkc+WVMd1bkAF3cg8vk9Q1LrJ/2i/A==,iterations=4096,SCRAM-SHA-256=salt=ZDg5MHVlYW40dW9jbXJ6MndvZDVlazd3ag==,stored_key=cgX1ldpXnDL1+TlLHJ3IHn7tAQS/7pQ7BVZUtECpQ3A=,server_key=i7Mcnb5sPUqfIFs6qKWWHZ2ortoKiRc7oabHOV5dawI=,iterations=8192
    
    

    删除SCRAM证书

    这里只演示,不操作

    [root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --alter  --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name fanboshi
    
    

    2. 配置Kafka Brokers

    1. 在每个Kafka broker的config目录中添加一个类似下面的JAAS文件,我们称之为kafka_server_jaas.conf:
    [root@node002229 config]# cat kafka_server_jaas.conf
    KafkaServer {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="admin"
      password="admin-secret";
    };
    
    

    注意不要少写了分号

    1. 将JAAS配置文件位置作为JVM参数传递给每个Kafka broker:
      修改 /usr/local/kafka/bin/kafka-server-start.sh
      exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" 注释, 增加下面的内容
    #exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"
    
    

    或者不修改kafka-server-start.sh脚本, 而是将下面的内容添加到~/.bashrc

    export KAFKA_PLAIN_PARAMS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
    export KAFKA_OPTS="$KAFKA_PLAIN_PARAMS $KAFKA_OPTS"
    
    
    1. 如此处所述,在server.properties中配置SASL端口和SASL机制。 例如:
    # 认证配置
    listeners=SASL_PLAINTEXT://node002229:9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
    sasl.enabled.mechanisms=SCRAM-SHA-256
    
    # ACL配置
    allow.everyone.if.no.acl.found=false
    super.users=User:admin
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    
    

    再官方文档中写的是

    listeners=SASL_SSL://host.name:port
    security.inter.broker.protocol=SASL_SSL
    
    

    这里其实没必要写成SASL_SSL , 我们可以根据自己的需求选择SSL或PLAINTEXT, 我这里选择PLAINTEXT不加密明文传输, 省事, 性能也相对好一些

    1. 重启ZK/Kafka
      重启ZK / Kafka服务. 所有broker在连接之前都会引用'kafka_server_jaas.conf'.
      Zookeeper所有节点
    [root@node002229 zookeeper]# zkServer.sh stop /usr/local/zookeeper/bin/../conf/zoo.cfg  
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/bin/../conf/zoo1.cfg
    Stopping zookeeper ... STOPPED
    
    [root@node002229 zookeeper]# zkServer.sh start /usr/local/zookeeper/bin/../conf/zoo.cfg  
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/bin/../conf/zoo1.cfg
    
    

    Kafka所有Broker

    cd /usr/local/kafka/;bin/kafka-server-stop.sh
    cd /usr/local/kafka/;bin/kafka-server-start.sh -daemon config/server.properties
    
    

    客户端配置

    先使用kafka-console-producer 和 kafka-console-consumer 测试一下

    kafka-console-producer

    1. 创建 config/client-sasl.properties 文件
    [root@node002229 kafka]# vim config/client-sasl.properties
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-256
    
    
    1. 创建config/kafka_client_jaas_admin.conf文件
    [root@node002229 kafka]# vim config/kafka_client_jaas_admin.conf 
    KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="admin"
      password="admin-secret";
    };
    
    
    1. 修改kafka-console-producer.sh脚本
      这里我复制一份,再改
    cp bin/kafka-console-producer.sh bin/kafka-console-producer-admin.sh
    vim bin/kafka-console-producer-admin.sh
    #exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
    exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_admin.conf kafka.tools.ConsoleProducer "$@"
    
    
    1. 创建测试topic
    bin/kafka-topics.sh --create --zookeeper node1:2181 --partitions 1 --replication-factor 1 --topic test
    
    
    1. 测试生产消息
    bin/kafka-console-producer-admin.sh --broker-list node1:9092 --topic test --producer.config config/client-sasl.properties
    >1
    >
    
    

    可以看到admin用户无需配置ACL就可以生成消息

    1. 测试fanboshi用户
      如法炮制, 我们创建一个bin/kafka-console-producer-fanboshi.sh文件, 只是修改其中的kafka_client_jaas_admin.conf 为 kafka_client_jaas_fanboshi.conf
    vim config/kafka_client_jaas_fanboshi.conf 
    KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="fanboshi"
      password="fanboshi";
    };
    
    cp bin/kafka-console-producer-admin.sh bin/kafka-console-producer-fanboshi.sh
    vi bin/kafka-console-producer-fanboshi.sh
    exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_fanboshi.conf kafka.tools.ConsoleProducer "$@"
    
    

    生产消息

    [root@node002229 kafka]# bin/kafka-console-producer-fanboshi.sh --broker-list node1:9092 --topic test --producer.config config/client-sasl.properties
    >1
    [2019-01-26 18:07:50,099] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
    [2019-01-26 18:07:50,100] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
    
    

    可以看到报错了, 因为fanboshi用户还没有权限

    kafka-console-consumer

    1. 创建 config/consumer-fanboshi.properties 文件
    [root@node002229 kafka]# vim config/consumer-fanboshi.properties
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-256
    group.id=fanboshi-group
    
    
    1. 创建 bin/kafka-console-consumer-fanboshi.sh 文件
    cp bin/kafka-console-consumer.sh bin/kafka-console-consumer-fanboshi.sh
    vim bin/kafka-console-consumer-fanboshi.sh
    #exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
    exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_fanboshi.conf kafka.tools.ConsoleConsumer "$@"
    
    
    1. 测试消费者
    bin/kafka-console-consumer-fanboshi.sh --bootstrap-server node1:9092 --topic test --consumer.config config/consumer-fanboshi.properties --from-beginning
    
    

    其实也会报错的, 报错内容就不贴了

    ACL配置

    授予fanboshi用户对test topic 写权限, 只允许 192.168.2.* 网段

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:fanboshi --operation Write --topic test --allow-host 192.168.2.*
    
    

    授予fanboshi用户对test topic 读权限, 只允许 192.168.2.* 网段

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:fanboshi --operation Read --topic test --allow-host 192.168.2.*
    
    

    授予fanboshi用户, fanboshi-group 消费者组 对test topic 读权限, 只允许 192.168.2.* 网段

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:fanboshi --operation Read --group fanboshi-group --allow-host 192.168.2.*
    
    

    查看acl配置

    [root@node002229 kafka]# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --list
    Current ACLs for resource `Group:LITERAL:fanboshi-group`: 
            User:fanboshi has Allow permission for operations: Read from hosts: * 
    
    Current ACLs for resource `Topic:LITERAL:test`: 
            User:fanboshi has Allow permission for operations: Write from hosts: *
            User:fanboshi has Allow permission for operations: Read from hosts: * 
    
    

    删除配置

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --remove --allow-principal User:fanboshi  --operation Write --topic test --allow-host *
    
    

    再次测试

    生产者

    [root@node002229 kafka]# bin/kafka-console-producer-fanboshi.sh --broker-list node1:9092 --topic test --producer.config config/client-sasl.properties
    >1
    [2019-01-26 18:07:50,099] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
    [2019-01-26 18:07:50,100] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
    >1
    
    

    消费者

    [root@node002229 kafka]# bin/kafka-console-consumer-fanboshi.sh --bootstrap-server node1:9092 --topic test --consumer.config config/consumer-fanboshi.properties --from-beginning
    1
    1
    
    

    都没问题了

    如何查看我们创建了哪些"用户"

    好像只能去zookeeper看?

    zkCli.sh -server node1:2181
    ls /config/users
    [admin, alice, fanboshi]
    
    

    尝试删除alice

    [root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name alice
    Configs for user-principal 'alice' are SCRAM-SHA-512=salt=MWt1OHRhZnd3cWZvZ2I4bXcwdTM0czIyaTQ=,stored_key=JYeud1Cx5Z2+FaJgJsZGbMcIi63B9XtA9Wyc+KEm2gXK8+2IxxAVvi1CfSjlkqeupfeIMFJ7/EUkOw+zqvYz6w==,server_key=O4NIgjleroia7puK01/ZZoagFeoxh+zHzckGXXooBsWTdx/7Shb0pMHniMu4IY2jb5orWB2t9K8MZkxCliJDsg==,iterations=4096,SCRAM-SHA-256=salt=MTJ3bXRod3EyN3FtZWdsNHk0NXoyeWdlNjE=,stored_key=chQX35reoBYtfg/U5HBtkzvBAk+gSCgskNzUiScOrUE=,server_key=rRTbUzAehwVMUDTMuoOMumGEuvc7wDecKcqK6yYlbWY=,iterations=8192
    [root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
    Completed Updating config for entity: user-principal 'alice'.
    [root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name alice                              
    Configs for user-principal 'alice' are SCRAM-SHA-256=salt=MTJ3bXRod3EyN3FtZWdsNHk0NXoyeWdlNjE=,stored_key=chQX35reoBYtfg/U5HBtkzvBAk+gSCgskNzUiScOrUE=,server_key=rRTbUzAehwVMUDTMuoOMumGEuvc7wDecKcqK6yYlbWY=,iterations=8192
    [root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name alice   
    Completed Updating config for entity: user-principal 'alice'.
    [root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name alice                              
    Configs for user-principal 'alice' are 
    
    

    去ZK查看

    [zk: node1:2181(CONNECTED) 0] ls /config/users
    [admin, alice, fanboshi]
    
    

    kafka ACL常用权限操作

    创建topic

    使用bin/kafka-topics.sh创建
    注意工具bin/kafka-topics.sh访问的是zookeeper而不是kafka,即他是一个zookeeper client而不是一个kafka client,所以它的认证都是通过zookeeper完成的。
    Case 1:如果zookeeper没有配置ACL激活:

    /opt/kafka/bin/kafka-topics.sh --create \
      --zookeeper node1:2181 \
      --replication-factor 1 \
      --partitions 1 \
      --topic kafkaclient-topic
    

    Case 2:如果zookeeper已经配置ACL激活:
    命令还是前面的那个命令,但是必须提供java.security.auth.login.config指向jaas.conf文件。例如:

    $ cat $ZOOKEEPER_HOME/conf/zookeeper_client_jaas_admin.conf 
    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="adminpwd";
    };
    

    命令的配置可以直接修改jvm的启动脚本,或者设置在环境变量里:

    export KAFKA_OPTS=-Djava.security.auth.login.config=$ZOOKEEPER_HOME/conf/zookeeper_client_jaas_admin.conf
    
    

    这里配置的用户必须是zookeeper服务端已经配置运行可以访问的客户端用户。例如,下面的zookeeper服务端配置:

    $ cat $ZOOKEEPER_HOME/conf/zookeeper_jaas.conf 
    Server {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="adminpwd"
        user_admin="adminpwd";
    };
    

    运行客户端为admin的用户作为zookeeper客户端连接访问。

    查询topic

    kafka-topics.sh --list --zookeeper node1:2181
    

    查询topic操作的ACL认证,同前面创建topic操作的认证一样,不细说,参考前面。

    删除topic

    /kafka-topics.sh \
      --delete \
      --zookeeper node1:2181 \
      --topic kafkaclient-topic
    

    删除topic操作的ACL认证,同前面创建topic操作的认证一样,不细说,参考前面。

    相关文章

      网友评论

        本文标题:kafka权限控制

        本文链接:https://www.haomeiwen.com/subject/ebloqctx.html