美文网首页spark
spark读取kafka文件写入hive

spark读取kafka文件写入hive

作者: chen_666 | 来源:发表于2020-05-05 20:57 被阅读0次

    1.将hdfs-site,core-site.hive-site文件拷贝到resources目录下

    image.png

    2.添加maven依赖

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>1.2.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-service</artifactId>
                <version>1.2.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.27</version>
            </dependency>
        </dependencies>
    

    3.编写代码

    object KafkaDemo {
    
      def main(args: Array[String]): Unit = {
    
        //1.创建 SparkConf 并初始化 SSC
        val sparkConf: SparkConf = new SparkConf()
          .setMaster("local[*]")
          .setAppName("KafkaSparkStreaming")
        val ssc = new StreamingContext(sparkConf, Seconds(20))
    
        //2.定义 kafka 参数
        val brokers = "s201:9092"
        val consumerGroup = "spark"
    
        //3.将 kafka 参数映射为 map
        val kafkaParams = Map[String, String](
          "bootstrap.servers" -> brokers,
          "group.id" -> consumerGroup,
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
        //要监听的Topic,可以同时监听多个
        val topics = Array("student")
        //4.通过 KafkaUtil 创建 kafkaDSteam
        val dstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
        
    
        dstream.foreachRDD(rdd => {
          //获取到分区和偏移量信息
          val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          val events: RDD[Some[String]] = rdd.map(x => {
            val data = x.value()
            Some(data)
          })
         
          val warehouseLocation = "spark-warehouse"
          val spark = SparkSession
            .builder()
            .appName("Spark Hive Example")
            .enableHiveSupport()
            .config("spark.sql.warehouse.dir", warehouseLocation)
            .config("user.name", "hadoop")
            .config("HADOOP_USER_NAME", "hive")
            .getOrCreate()
          
          import spark.sql
          //配置hive支持动态分区
          sql("set hive.exec.dynamic.partition=true")
          //配置hive动态分区为非严格模式
          sql("set hive.exec.dynamic.partition.mode=nonstrict")
    
          //如果将数据转换为Seq(xxxx),然后倒入隐式转换import session.implicalit._  是否能实现呢,答案是否定的。
          //构建row
          val dataRow = events.map(line => {
            val temp = line.get.split("###")
            Row(temp(0), temp(1), temp(2))
          })
    
          //"deviceid","code","time","info","sdkversion","appversion"
          //确定字段的类别
          val structType = StructType(Array(
            StructField("name", StringType, true),
            StructField("age", StringType, true),
            StructField("major", StringType, true)
          ))
          //构建df
          val df = spark.createDataFrame(dataRow, structType)
    
          val unit = df.createOrReplaceTempView("jk_device_info")
    
          val frame = sql("insert into myhive.student select * from jk_device_info")
    
        })
          //6.启动 SparkStreaming
          ssc.start()
          ssc.awaitTermination()
      }
    }
    

    启动hadoop,zookeeper,kafka

    /opt/module/hadoop-2.7.2/sbin/start-dfs.sh
    /opt/module/hadoop-2.7.2/sbin/start-yarn.sh

    zk.sh start

    #! /bin/bash
    case $1 in
    "start"){
        for i in s201 s202 s203
        do
            ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
        done
    };;
    "stop"){
        for i in s201 s202 s203
        do
            ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
        done
    };;
    "status"){
        for i in s201 s202 s203
        do
            ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
        done
    };;
    esac
    

    kf.sh start

    #! /bin/bash
    case $1 in
    "start"){
            for i in s201 s202 s203
            do
                    echo " --------启动 $i Kafka-------"
                    # 用于KafkaManager监控
                    ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/modu
    le/kafka/config/server.properties "        done
    };;
    "stop"){
            for i in s201 s202 s203
            do
                    echo " --------停止 $i Kafka-------"
                    ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
            done
    };;
    esac
    

    kafka发送消息

    bin/kafka-console-producer.sh --broker-list s201:9092 --topic student

    xiekai###24###ningdu
    

    hive中查看是否是否插入

    xiekain 24 ningdu


    image.png

    插入成功

    注意,运行是可能会报HDFS的权限问题,所以需要加入运行时参数

    image.png

    相关文章

      网友评论

        本文标题:spark读取kafka文件写入hive

        本文链接:https://www.haomeiwen.com/subject/pyevghtx.html