美文网首页
sparkStreaming向hive动态分区落地数据(IDEA

sparkStreaming向hive动态分区落地数据(IDEA

作者: 早点起床晒太阳 | 来源:发表于2020-07-22 15:20 被阅读0次

前言

做电信项目有时有这种场景,将sparkStreaming读取kafka的数据经过过滤,筛选处理,广播、join等一系列操作后,根据每次批的间隔时间,把数据动态插入hive表的分区中,这边我IDEA本地进行先关测试

sparkStreaming本地访问hive遇到的问题

1、读取不到hive表

需要将hive-site.xml hdfs-site.xml core-site.xml 放到resource目录下 或者使用spark显示的加入配置文件 (我这边是直接放到resource目录下,比较方便)

2、执行spark.sql(插入表等操作的时候)时用户不对,没有权限(Permission denied: user=Administrator, access=WRITE)

这个时候是因为用户不对,错误的将电脑的用户当成了hive的用户
解决办法是: 添加System.setProperty("HADOOP_USER_NAME", "hadoop") 将用户设置成指定的用户(我这里设置的是hadoop)

这里注意的是System.setProperty("HADOOP_USER_NAME", "hadoop") 这一行代码必须要在sparksession创建之前设置,不然不生效

  System.setProperty("HADOOP_USER_NAME", "hadoop")

  val session = SparkSession
    .builder().enableHiveSupport().master("local[*]").getOrCreate()

编写代码遇到的问题

1、替换变量

我在插入动态分区的时候,需要将partition的时间动态的传入进去,这个就需要用到scala的语法 s前缀,它可以来进行变量替换
比如

    def test() = {
        val word = "hello"
        println(s"$word, world")
    }
2、sql整体都在一行,让sql变成多行方便查看

这时候使用stripMargin 来解决问题 类似于这种形式

      var result=spark.sql(
        s"""
          |insert into ceshi.fenqu partition (p=$p)
          |select ccard,cname,ename,phone,email,address from moniaaa
        """.stripMargin)

相关文章

网友评论

      本文标题:sparkStreaming向hive动态分区落地数据(IDEA

      本文链接:https://www.haomeiwen.com/subject/oqftlktx.html