美文网首页
sparkStreaming向hive动态分区落地数据(IDEA

sparkStreaming向hive动态分区落地数据(IDEA

作者: 早点起床晒太阳 | 来源:发表于2020-07-22 15:20 被阅读0次

    前言

    做电信项目有时有这种场景,将sparkStreaming读取kafka的数据经过过滤,筛选处理,广播、join等一系列操作后,根据每次批的间隔时间,把数据动态插入hive表的分区中,这边我IDEA本地进行先关测试

    sparkStreaming本地访问hive遇到的问题

    1、读取不到hive表

    需要将hive-site.xml hdfs-site.xml core-site.xml 放到resource目录下 或者使用spark显示的加入配置文件 (我这边是直接放到resource目录下,比较方便)

    2、执行spark.sql(插入表等操作的时候)时用户不对,没有权限(Permission denied: user=Administrator, access=WRITE)

    这个时候是因为用户不对,错误的将电脑的用户当成了hive的用户
    解决办法是: 添加System.setProperty("HADOOP_USER_NAME", "hadoop") 将用户设置成指定的用户(我这里设置的是hadoop)

    这里注意的是System.setProperty("HADOOP_USER_NAME", "hadoop") 这一行代码必须要在sparksession创建之前设置,不然不生效

      System.setProperty("HADOOP_USER_NAME", "hadoop")
    
      val session = SparkSession
        .builder().enableHiveSupport().master("local[*]").getOrCreate()
    

    编写代码遇到的问题

    1、替换变量

    我在插入动态分区的时候,需要将partition的时间动态的传入进去,这个就需要用到scala的语法 s前缀,它可以来进行变量替换
    比如

        def test() = {
            val word = "hello"
            println(s"$word, world")
        }
    
    2、sql整体都在一行,让sql变成多行方便查看

    这时候使用stripMargin 来解决问题 类似于这种形式

          var result=spark.sql(
            s"""
              |insert into ceshi.fenqu partition (p=$p)
              |select ccard,cname,ename,phone,email,address from moniaaa
            """.stripMargin)
    

    相关文章

      网友评论

          本文标题:sparkStreaming向hive动态分区落地数据(IDEA

          本文链接:https://www.haomeiwen.com/subject/oqftlktx.html