美文网首页
Kafka安全机制解析及重构(四)| ACL权限控制

Kafka安全机制解析及重构(四)| ACL权限控制

作者: MisterCH | 来源:发表于2018-08-08 11:49 被阅读646次

    在前三篇文章中我们介绍了Kafka的安全机制,并自己重构了一个名为ABC的SASL机制
    Kafka安全机制解析及重构(一)
    Kafka安全机制解析及重构(二)
    Kafka安全机制解析及重构(二)

    我们将用户的权限细分为以下两类:

    • 连接权限:Client可以与Broker之间建立连接
    • 读写权限:Client可以往具体的Topic发送或消费消息

    根据上一篇文章中的配置方法配置好后,Kafka可以实现Client与Broker之间的连接鉴权,但是也仅仅是连接权限。当按照上一篇的尝试配置好后,如果再配置上Kafka自带的ACL,会发现broker之间无法同步数据,且客户端就算配置上权限,仍然无法访问指定的TOPIC。这与Broker之间通信使用PLAINTEXT机制有关。

    我们重读一下Kafka的ACL的说明

    Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R".

    Principal指的就是用于校验权限的信息,在我们的机制中,也就是用户名

    通过阅读kafka.security.auth.SimpleAclAuthorizer这个类可以发现,在authorize()这个方法中,principal是从session中读取出来的。

      override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
        val principal = session.principal
        val host = session.clientAddress.getHostAddress
        val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
    
        //check if there is any Deny acl match that would disallow this operation.
        val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
    
        //if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny.
        val ops = if (Describe == operation)
          Set[Operation](operation, Read, Write, Delete)
        else
          Set[Operation](operation)
    
        //now check if there is any allow acl that will allow this operation.
        val allowMatch = ops.exists(operation => aclMatch(session, operation, resource, principal, host, Allow, acls))
    
        //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
        //when no acls are found or if no deny acls are found and at least one allow acls matches.
        val authorized = isSuperUser(operation, resource, principal, host) ||
          isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
          (!denyMatch && allowMatch)
    
        logAuditMessage(principal, authorized, operation, resource, host)
        authorized
      }
    

    可Kafka Broker之间明明是通过PLAINTEXT连接的,不会带上用户名信息的,那总该有个默认的principle吧,这个默认的principal可以在org.apache.kafka.common.security.auth.KafkaPrincipal中找到

    public class KafkaPrincipal implements Principal {
        public static final String SEPARATOR = ":";
        public static final String USER_TYPE = "User";
        public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
    

    为了进一步验证broker之间的连接是否是用ANONYMOUS连接的,我们可以开启Kafka的debug日志,在config/log4j.properties中修改配置

    log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
    log4j.additivity.kafka.authorizer.logger=false
    

    然后就可以在logs/kafka-authorizer.log中看到Broker之间互相用ANONYMOUS来访问,然后被拒绝的信息了。

    既然明确了Broker之间使用的ANONYMOUS用户,那就好办了,将ANONYMOUS配置成超级用户就行了。我们只需要在server.properties中进行如下的配置,就可以让broker之间恢复正常通信:

    #broker内部通信使用PLAIN机制9092端口,外部通信使用SASL_PLAINTEXT的9093端口
    listeners=PLAINTEXT://hostname:9093,SASL_PALINTEXT://hostname:9092
    sasl.enabled.mechanism=ABC,PLAIN
    
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    security.inter.broker.protocol=PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN
    super.users=User:ANONYMOUS
    

    需要注意以下两点:

    1. 对外仅开放SASL_PLAINTEXT的端口,所有客户端连接都要通过SASL
    2. Broker之间的通信使用ANONYMOUS这个用户,因此不要在用户配置中添加上ANONYMOUS这个用户,这个用户仅能被broker使用。

    相关文章

      网友评论

          本文标题:Kafka安全机制解析及重构(四)| ACL权限控制

          本文链接:https://www.haomeiwen.com/subject/yvptbftx.html