美文网首页Hadoop
101.Spark2Streaming在Kerberos环境下的

101.Spark2Streaming在Kerberos环境下的

作者: 大勇任卷舒 | 来源:发表于2022-02-14 11:08 被阅读0次

101.1 演示环境介绍

  • CM版本:5.14.3
  • CDH版本:5.14.2
  • CDK版本:2.2.0
  • Apache Kafka版本:0.10.2
  • SPARK版本:2.2.0
  • Redhat版本:7.3
  • 已启用Kerberos,用root用户进行操作

101.2 操作演示

1.准备环境

  • 使用xst命令导出keytab文件,准备访问Kafka的Keytab文件
[root@cdh01 ~]# kadmin.local 
Authenticating as principal hbase/admin@FAYSON.COM with password.
kadmin.local:  xst -norandkey -k fayson.keytab fayson@FAYSON.COM
  • 用klist命令检查导出的keytab文件是否正确
[root@cdh01 ~]# klist -ek fayson.keytab
  • jaas.cof文件内容
    • 把fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
  principal="fayson@FAYSON.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
  principal="fayson@FAYSON.COM";
};
  • 根据需求将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:
{
   "occupation": "劳动者、运输工作和部分体力生产工作",
   "address": "山东东三路18号-6-6",
   "city": "长江",
   "marriage": "1",
   "sex": "1",
   "name": "魏淑芬",
   "mobile_phone_num": "13508268580",
   "bank_name": "广发银行32",
   "id": "510105198906185189",
   "child_num": "1",
   "fix_phone_num": "16004180180"
}
  • 把SPARK2f服务的配置项将spark_kafka_version的kafka版本修改为0.10


2.SparkStreaming开发

  • pom.xml依赖
    • 使用maven创建scala语言的spark2demo
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark2_2.11</artifactId>
    <version>1.6.0-cdh5.14.2</version>
</dependency>
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.6.0-cdh5.14.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.0.cloudera2</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
  • 在resources下创建0288.properties配置文件
kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
kafka.topics=Kafka_kudu_topic
kudumaster.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com
  • 创建Kafka2Spark2Kudu.scala文件
package com.cloudera.streaming

import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf}
import scala.collection.JavaConverters._
import scala.util.parsing.json.JSON

/**
  * package: com.cloudera.streaming
  * describe: Kerberos环境中Spark2Streaming 应用实时读取Kafka数据,解析后存入Kudu
  * 使用spark2-submit的方式提交作业
    spark2-submit --class com.cloudera.streaming.Kafka2Spark2Kudu \
    --master yarn \
    --deploy-mode client \
    --executor-memory 2g \
    --executor-cores 2 \
    --driver-memory 2g \
    --num-executors 2 \
    --queue default  \
    --principal fayson@FAYSON.COM \
    --keytab /data/disk1/0286-kafka-shell/conf/fayson.keytab \
    --driver-java-options "-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf" \
    spark2-demo-1.0-SNAPSHOT.jar
  * 公众号:碧茂大数据
  */
object Kafka2Spark2Kudu {
  Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别

  var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0288.properties"

  /**
    * 建表Schema定义
    */
  val userInfoSchema = StructType(
      //         col name   type     nullable?
      StructField("id", StringType , false) ::
      StructField("name" , StringType, true ) ::
      StructField("sex" , StringType, true ) ::
      StructField("city" , StringType, true ) ::
      StructField("occupation" , StringType, true ) ::
      StructField("tel" , StringType, true ) ::
      StructField("fixPhoneNum" , StringType, true ) ::
      StructField("bankName" , StringType, true ) ::
      StructField("address" , StringType, true ) ::
      StructField("marriage" , StringType, true ) ::
      StructField("childNum", StringType , true ) :: Nil
  )

  /**
    * 定义一个UserInfo对象
    */
  case class UserInfo (
    id: String,
    name: String,
    sex: String,
    city: String,
    occupation: String,
    tel: String,
    fixPhoneNum: String,
    bankName: String,
    address: String,
    marriage: String,
    childNum: String
  )

  def main(args: Array[String]): Unit = {
    //加载配置文件
    val properties = new Properties()
    val file = new File(confPath)
    if(!file.exists()) {
      System.out.println(Kafka2Spark2Kudu.getClass.getClassLoader.getResource("0288.properties"))
      val in = Kafka2Spark2Kudu.getClass.getClassLoader.getResourceAsStream("0288.properties")
      properties.load(in);
    } else {
      properties.load(new FileInputStream(confPath))
    }

    val brokers = properties.getProperty("kafka.brokers")
    val topics = properties.getProperty("kafka.topics")
    val kuduMaster = properties.getProperty("kudumaster.list")
    println("kafka.brokers:" + brokers)
    println("kafka.topics:" + topics)
    println("kudu.master:" + kuduMaster)

    if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(kuduMaster)) {
      println("未配置Kafka和KuduMaster信息")
      System.exit(0)
    }
    val topicsSet = topics.split(",").toSet

    val spark = SparkSession.builder().appName("Kafka2Spark2Kudu-kerberos").config(new SparkConf()).getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
      , "auto.offset.reset" -> "latest"
      , "security.protocol" -> "SASL_PLAINTEXT"
      , "sasl.kerberos.service.name" -> "kafka"
      , "key.deserializer" -> classOf[StringDeserializer]
      , "value.deserializer" -> classOf[StringDeserializer]
      , "group.id" -> "testgroup"
    )


    val dStream = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    //引入隐式
    import spark.implicits._
    val kuduContext = new KuduContext(kuduMaster, spark.sparkContext)

    //判断表是否存在
    if(!kuduContext.tableExists("user_info")) {
      println("create Kudu Table :{user_info}")
      val createTableOptions = new CreateTableOptions()
      createTableOptions.addHashPartitions(List("id").asJava, 8).setNumReplicas(3)
      kuduContext.createTable("user_info", userInfoSchema, Seq("id"), createTableOptions)
    }

    dStream.foreachRDD(rdd => {
      //将rdd数据重新封装为Rdd[UserInfo]
      val newrdd = rdd.map(line => {
        val jsonObj =  JSON.parseFull(line.value())
        val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
        new UserInfo(
          map.get("id").get.asInstanceOf[String],
          map.get("name").get.asInstanceOf[String],
          map.get("sex").get.asInstanceOf[String],
          map.get("city").get.asInstanceOf[String],
          map.get("occupation").get.asInstanceOf[String],
          map.get("mobile_phone_num").get.asInstanceOf[String],
          map.get("fix_phone_num").get.asInstanceOf[String],
          map.get("bank_name").get.asInstanceOf[String],
          map.get("address").get.asInstanceOf[String],
          map.get("marriage").get.asInstanceOf[String],
          map.get("child_num").get.asInstanceOf[String]
        )
      })
      //将RDD转换为DataFrame
      val userinfoDF = spark.sqlContext.createDataFrame(newrdd)
      kuduContext.upsertRows(userinfoDF, "user_info")
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 使用mvn命令编译工程
    • 由于是scala工程编译时mvn命令要加scala:compile
mvn clean scala:compile package
  • 将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务
    • 在conf目录下新增0288.properties配置文件

3.运行

  • 用spark2-submit命令向集群提交SparkStreaming作业
spark2-submit --class com.cloudera.streaming.Kafka2Spark2Kudu \
--master yarn \
--deploy-mode client \
--executor-memory 2g \
--executor-cores 2 \
--driver-memory 2g \
--num-executors 2 \
--queue default  \
--principal fayson@FAYSON.COM \
--keytab /data/disk1/0286-kafka-shell/conf/fayson.keytab \
--driver-java-options "-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf" \
spark2-demo-1.0-SNAPSHOT.jar
  • 通过CM查看作业是否提交成功
  • 通过Kudu Master的管理界面可以看到user_info表已创建
    • 点击Table Id列进入user_info表详情页,获取Impala的建表语句:
CREATE EXTERNAL TABLE `user_info` STORED AS KUDU
TBLPROPERTIES(
    'kudu.table_name' = 'user_info',
    'kudu.master_addresses' = 'cdh01.fayson.com:7051,cdh02.fayson.com:7051,cdh03.fayson.com:7051')
  • 运行脚本向Kafka的Kafka_kudu_topic生产消息
  • 登录Hue在Impala中执行上面的建表语句


  • 执行Select查询user_info表中数据,数据已成功入库

    4.总结
  • Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10
  • 在scala代码中访问Kafka是也一样需要添加Kerberos相关的配置security.protocol和sasl.kerberos.service.name参数
  • jaas.conf文件Fayson通过spark2-submit的方式指定,jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的
  • 在/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下需要检查下是否有其它版本的spark-streaming-kafka的依赖包,如果存在需要删除,否则会出现版本冲突问题

大数据视频推荐:
腾讯课堂
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

相关文章

网友评论

    本文标题:101.Spark2Streaming在Kerberos环境下的

    本文链接:https://www.haomeiwen.com/subject/qndakrtx.html