美文网首页数客联盟
Spark-Streaming Kafka In Kerbero

Spark-Streaming Kafka In Kerbero

作者: 杨光明子 | 来源:发表于2017-07-28 12:19 被阅读1316次

    最近在HDP2.6的环境里尝试了Kerberos,在各组件运行正常的情况下最终成功运行spark-streaming应用,总结一下就是一叶障目,不见泰山,坑多梯子少。尤其在国内,关于Kerberos的资料较少,但在生产环境中,Kerberos又是如鲠在喉,无法忽视。

    因此分享这篇文章,希望能给还在苦苦爬坑的小伙伴们一点帮助。

    • 我们的HDP为单用户ocsp安装,多用户需要根据以下步骤进行细微修改

    确认OCSP各组件的Kerberos工作正常

    1. Kafka

    • 使用kafka-topics.sh创建topic

    • 使用kafka producer和consumer需要先kinit
      kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/host-10-1-236-122@ASIAINFO.COM

    • 使用producer发送消息,consumer消费消息

      • kafka producer
        /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --topic kerin --broker-list host-10-1-236-122:6667 --security-protocol PLAINTEXTSASL

      • kafka consumer
        /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-consumer.sh --topic kerin --security-protocol PLAINTEXTSASL --bootstrap-server host-10-1-236-122:6667

    FAQ:

    • 使用kafka producer和consumer需要先kinit kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/<hostname>@ASIAINFO.COM
    • 否则:
      • kafka producer 报错:

        [2017-07-19 10:44:56,582] WARN Error while fetching metadata with correlation id 0 : {kertest=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

      • kafka consumer 报错:

        javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user


    2. Hive

    • kinit
    • 使用beeline登录

    3. Phoenix

    • 使用sqlline与principal,keytab登录

    进行Spark,Kafka针对Kerberos相关配置

    1. 先放上最后提交任务的命令

    spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp-yg@ASIAINFO.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 
    
    • --principal与--keytab这两个参数为spark需要的Kerberos认证信息

    • --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf"为driver连接kafka用到的认证信息,因此使用本地绝对路径

    • --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf"为executor连接kafka用到的Kerberos认证信息,因此使用container中的相对路径./

    • jaas文件中定义了principal与keytab,由于我们使用了yarn-client模式,driver需要的文件在本地文件系统,executor需要的文件需要我们使用--files的方式上传,即--files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab"

    • 有的文档中说--files中传keytab文件会与spark本身的--keytab 冲突,其实是因为他们对spark和kafka使用了相同的principal和keytab,在上述命令中我为了清晰起见,让spark使用了principal ocsp-yg@ASIAINFO.COM,keytab hdfs.headless.keytab,让spark连接kafka时使用了principal ocsp/ASIAINFO.COM(principal其实是在jaas文件中指定的,3中详细讲jaas文件) keytab ocsp.keytab,当spark提交任务时,yarn会将--keytab后面的keytab文件与--files里的文件先后上传,即 hdfs.headless.keytab与ocsp.keytab均会被上传,spark与kafka各取所需,即可正常工作。当spark与kafka要使用相同的keytab文件时,比如都用ocsp.keytab,那么yarn会先后上传两次ocsp.keytab,在spark正使用的时候更新了keytab,造成异常退出

    • 因此如果spark与kafka需要使用相同的keytab文件,我们只需要在--files里不要上传keytab即可避免冲突

    spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp@ASIAINFO.COM --keytab /etc/security/keytabs/ocsp.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 
    
    • 还有一个问题是本例中drvier和executor使用了相同的kafka_client_jaas.conf,这也会造成一些问题,3中会详细说明

    2. 生成keytab和principal

    • 在KDC Server上执行
      kadmin -p admin/admin@ASIAINFO.COM
    • 生成principal,principal最好使用ocsp的用户名+domain
      addprinc -randkey ocsp/ASIAINFO.COM
    • 生成keytab
      ktadd -k /data/ocsp.keytab ocsp/ASIAINFO.COM
    • 将keytab文件copy到spark driver所在的机器(因为OCSP默认使用yarn-client模式)

    3. 创建spark读取kafka的jaas配置文件

    • 配置文件kafka_client_jaas.conf样例如下:
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
      useTicketCache=false
      useKeyTab=true
      principal="ocsp@ASIAINFO.COM"
      keyTab="./ocsp.keytab"
      renewTicket=true
      storeKey=true
      serviceName="ocsp";
    };
    
    • 其中useTicketCache指从系统的cash中读取credential信息,useKeyTab指从指定的keyTab文件读取credential

    • principal和keytab用第二步生成的principal与keytab,注意:k�eytab的路径

      • 如果这个conf文件是给driver读取,则我们要用keytab文件在本地的绝对路径
      • 如果这个conf文件是executor读取,则我们要用keytab文件在container中的相对路径,即./ocsp.keytab
      • 如果为了方便起见,drvier与executor要使用相同的jaas文件,路径配置为./ocsp.keytab,我们需要将keytab文件copy到运行spark-submit的当前路径
      • 如果driver和executor要使用不同的jaas文件,则driver的jaas文件中,keytab应为本地绝对路径,executor的jaas文件中,keytab应为相对路径./

    4. 配置spark1.6+kafka0.10 jar包

    <dependency>
                        <groupId>com.hortonworks</groupId>
                        <artifactId>spark-kafka-0-10-connector-main_2.10</artifactId>
                        <version>1.0.1</version>
                        <scope>system</scope>
                        <systemPath>${project.basedir}/../lib/spark-kafka-0-10-connector_2.10-1.0.1.jar</systemPath>
    </dependency>
    

    5. 修改Spark读取Kafka部分

    • 需要import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    • 我们使用的DirectApi读取kafka
    KafkaUtils.createDirectStream[String, String](
            SSC,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](TopicsSet, KafkaParams))
    

    KafkaParams配置如下:

    val KafkaParams = Map[String, Object]("auto.offset.reset" -> "latest"
                , "key.deserializer" -> classOf[StringDeserializer]
                , "value.deserializer" -> classOf[StringDeserializer]
                , "security.protocol" -> "SASL_PLAINTEXT"
                , "bootstrap.servers" -> "kafka-server1:6667"
                , "group.id" -> "test")
    

    6. 修改Spark写Kafka部分

    • 写kafka调用的是kafka官方的库
    <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka_2.11</artifactId>
                    <version>0.10.1.1</version>
    </dependency>
    
    • 代码中需要import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
    val props = new Properties()
          props.put("bootstrap.servers", dsConf.get("metadata.broker.list", ""))
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          if (MainFrameConf.KERBEROS_ENABLE == "true"){
            props.put("security.protocol","SASL_PLAINTEXT")
          }
    new KafkaProducer[String, String](props)
    

    相关文章

      网友评论

        本文标题:Spark-Streaming Kafka In Kerbero

        本文链接:https://www.haomeiwen.com/subject/ggbxlxtx.html