Kafka-0.11.0.1鉴权机制解析

作者: shawnliang | 来源:发表于2019-01-10 09:21 被阅读90次

    1、命令入口

    经常使用Kafka的同学可能已经注意到了,赋予某个用户消费某个topic权限的命令是:

    bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer \
      --authorizer-properties zookeeper.connect=localhost:2181 --add \
      --allow-principal User:* --operation read --topic test --group mygroup
    

    在这个命令中,清晰得指出了鉴权使用的类是kafka.security.auth.SimpleAclAuthorizer;
    当然你也可以不指定,查看kafka-acl脚本,只有一句话:

    exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@",

    打开AclCommand类的代码,解析命令行参数的代码包含这么一段:

    class AclCommandOptions(args: Array[String]) {
        val parser = new OptionParser(false)
        val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
          .withRequiredArg
          .describedAs("authorizer")
          .ofType(classOf[String])
          .defaultsTo(classOf[SimpleAclAuthorizer].getName)
    

    不指定鉴权类时,默认就是kafka.security.auth.SimpleAclAuthorizer

    2、 逐句解读kafka.security.auth.SimpleAclAuthorizer

    在解读代码前,先解释下Acl的概念。
    顾名思义,ACL是访问控制列表(access control list)。但是,大家经常会误认为权限控制是ACL实现的。其实,ACL只是提供了存储机制,具体的权限控制逻辑由权限控制模块实现。另外,ACL权限分为功能权限和数据权限。

    联系到Kafka topic操作,创建、修改、删除topic的权限属于功能权限,用户能否在某个topic生产和消费的权限属于数据权限。

    创建、修改、删除Topic的权限不是由kafka.security.auth.SimpleAclAuthorizer类控制的,而是由ZK控制。

    org.I0Itec.zkclient.ZkClient类
    
    /**
         * Add authentication information to the connection. This will be used to identify the user and check access to
         * nodes protected by ACLs
         * 
         * @param scheme
         * @param auth
         */
        public void addAuthInfo(final String scheme, final byte[] auth) {
            retryUntilConnected(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    _connection.addAuthInfo(scheme, auth);
                    return null;
                }
            });
        }
    

    功能权限不是本文分析的重点,不展开叙述。

    SimpleAclAuthorizer类只有368行(包含注释),完成鉴权的核心方法只有不到30行。(Kafka的代码还是很简洁的!)

      override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
        val principal = session.principal
        val host = session.clientAddress.getHostAddress
        val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
    
        // Check if there is any Deny acl match that would disallow this operation.
        val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)
    
        // Check if there are any Allow ACLs which would allow this operation.
        // Allowing read, write, delete, or alter implies allowing describe.
        // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
        val allowOps = operation match {
          case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
          case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
          case _ => Set[Operation](operation)
        }
        val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))
    
        //we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users
        //when no acls are found or if no deny acls are found and at least one allow acls matches.
        val authorized = isSuperUser(operation, resource, principal, host) ||
          isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
          (!denyMatch && allowMatch)
    
        logAuditMessage(principal, authorized, operation, resource, host)
        authorized
      }
    

    方法只有三个参数:session保存了使用鉴权方法的客户端信息;operation就是具体鉴权的操作。比如:consumer、producer等;Resource就是本次鉴权的资源。比如:topic,group。

    方法体内,首先得到本次鉴权的principal,pricinpal是KafkaPrincipal类的实例,类定义如下:

    public class KafkaPrincipal implements Principal {
        public static final String SEPARATOR = ":";
        public static final String USER_TYPE = "User";
        public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
    
        private String principalType;
        private String name;
    
        public KafkaPrincipal(String principalType, String name) {
            if (principalType == null || name == null) {
                throw new IllegalArgumentException("principalType and name can not be null");
            }
            this.principalType = principalType;
            this.name = name;
        }
    
        public static KafkaPrincipal fromString(String str) {
            if (str == null || str.isEmpty()) {
                throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
            }
    
            String[] split = str.split(SEPARATOR, 2);
    
            if (split == null || split.length != 2) {
                throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
            }
    
            return new KafkaPrincipal(split[0], split[1]);
        }
    

    代码比较简单,有用的信息主要是这两句:

    if (split == null || split.length != 2) {
     throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
            }
    

    为何有用,暂且留个伏笔。

    接着看下面的代码,是至关重要的一句:

    val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
    

    这句代码会去ZK查询本资源的Acl列表,比如本次鉴权的资源对象是:new Resource(Topic, "myTopic"),有两个注意点:

    1、ResourceType是个枚举类,定义了Kafka里的各种资源,比如topic,group等。

    2、不是实时从ZK中读取,而是为了提高效率,从缓存中读取。那么缓存什么时候更新呢?

    可能读者已经注意到了这个变量Resource.WildCardResource,这个变量是个字符串,就是*号。为何要搞个名称是*号的资源呢?

    kafka可能包含成百上千的Topic,消费时候可能包含更多的group,我们不大可能对这些资源一一赋予权限,所以就需要通配机制。比如,文章开头那条命令稍作修改,将具体的资源名称改为*号:

    bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer \
      --authorizer-properties zookeeper.connect=localhost:2181 --add \
      --allow-principal User:* --operation read --topic '*' --group '*'
    

    表示对所有的topic和group赋予权限。这条语句为何能起作用,就是因为我们查询Acl时候构造了名称为*号的资源!
    各资源的Acl都是以json格式存储在ZK路径上,

    /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
    
    /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
    
    /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
    

    我们看下Acl类的代码:

    object Acl {
      val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
      val WildCardHost: String = "*"
      val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
      val PrincipalKey = "principal"
      val PermissionTypeKey = "permissionType"
      val OperationKey = "operation"
      val HostsKey = "host"
      val VersionKey = "version"
      val CurrentVersion = 1
      val AclsKey = "acls"
      
      ..........
      
        def fromJson(aclJson: String): Set[Acl] = {
        if (aclJson == null || aclJson.isEmpty)
          return collection.immutable.Set.empty[Acl]
    
        var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
        Json.parseFull(aclJson) match {
          case Some(m) =>
            val aclMap = m.asInstanceOf[Map[String, Any]]
            //the acl json version.
            require(aclMap(VersionKey) == CurrentVersion)
            val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
            aclSet.foreach(item => {
              val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
              val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
              val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
              val host: String = item(HostsKey).asInstanceOf[String]
              acls += new Acl(principal, permissionType, host, operation)
            })
          case None =>
        }
        acls.toSet
      }
      
    

    通过fromJson方法将ZK中存储的Acl json反序列化为Acl对象,KafkaPrincipal使用fromString方法反序列化,那么要求我们保存在ZK中的用户pricinpal字符串必须是类型:名称格式。(呼应上面伏笔)

    PermissionType和Operation类都是通过fromString方法反序列化,这要求我们保存在ZK中的字符串必须在它们的枚举定义范围内。

    接下来是真正的鉴权逻辑:

    1、首先判断是否有不允许此次操作的Acl

    2、接着判断是否有允许此操作的Acl(Describe操作比较特殊,允许read, write, delete, or alter都意味着允许Describe)

    3、然后判断是否是超级用户

    4、再判断是否没有Acl时候默认是有权限,这是Kafka的一个配置项allow.everyone.if.no.acl.found

    最终判断本次鉴权是否通过的逻辑,代码的注释写得很清楚:

    1、如果是超级用户,通过

    2、如果配置项allow.everyone.if.no.acl.found设置为true,通过

    3、如果没有Acl Deny,且至少有一个Acl Allow,通过

    至此,这个方法就解读完了。是不是很简单?

    上面遗留了一个问题,Acl的缓存什么时候更新呢?在初始化方法configure中可以看到,

      override def configure(javaConfigs: util.Map[String, _]) {
        val configs = javaConfigs.asScala
        val props = new java.util.Properties()
        configs.foreach { case (key, value) => props.put(key, value.toString) }
    
        superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
          case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
        }.getOrElse(Set.empty[KafkaPrincipal])
    
        shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
    
        // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
        // means that `KafkaConfig.zkConnect` must always be set by the user (even if `SimpleAclAuthorizer.ZkUrlProp` is also
        // set).
        val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
        val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
        val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
        val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
    
        zkUtils = ZkUtils(zkUrl,
                          sessionTimeout = zkSessionTimeOutMs,
                          connectionTimeout = zkConnectionTimeoutMs,
                          kafkaConfig.zkEnableSecureAcls)
        zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
    
        loadCache()
    
        zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
        aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
        aclChangeListener.init()
      }
    

    1、初始化的时候会loadCache

    2、初始化时候注册了ZK的aclChange监听(/kafka-acl-changes/acl_changes_0000000000)(ZK的监听你还知道哪些?)

    当某个资源的acl发生变化时,就是在ZK的/kafka-acl-changes路径下生成一个递增的流水号Node,Node的Data存储acl发生变化的资源信息。资源信息字符串参考Resource的定义:

    case class Resource(resourceType: ResourceType, name: String) {
    
      override def toString: String = {
        resourceType.name + Resource.Separator + name
      }
    }
    

    3、 二次开发kafka.security.auth.SimpleAclAuthorizer

    Kafka-0.11.0.1版本只支持对用户的鉴权,那么如果我们想支持对用户角色的鉴权,该如何做呢?

    如果你读懂了上面的代码解读,很容易想到解决方案。

    1、创建用户角色时候,在ZK相应的资源节点写入Acl数据(你只需构造一个表示角色的KafkaPrincipal)

    2、鉴权时候,查找到用户绑定的角色,读取该角色的Acl进行鉴权(你只需构造一个表示角色的KafkaPrincipal)

    3、当角色的权限发生变化时,记得在/kafka-acl-changes下生成流水记录,保证角色权限的变更实时更新。

    相关文章

      网友评论

        本文标题:Kafka-0.11.0.1鉴权机制解析

        本文链接:https://www.haomeiwen.com/subject/cfjorqtx.html