前言
做电信项目有时有这种场景,将sparkStreaming读取kafka的数据经过过滤,筛选处理,广播、join等一系列操作后,根据每次批的间隔时间,把数据动态插入hive表的分区中,这边我IDEA本地进行先关测试
sparkStreaming本地访问hive遇到的问题
1、读取不到hive表
需要将hive-site.xml hdfs-site.xml core-site.xml 放到resource目录下 或者使用spark显示的加入配置文件 (我这边是直接放到resource目录下,比较方便)
2、执行spark.sql(插入表等操作的时候)时用户不对,没有权限(Permission denied: user=Administrator, access=WRITE)
这个时候是因为用户不对,错误的将电脑的用户当成了hive的用户
解决办法是: 添加System.setProperty("HADOOP_USER_NAME", "hadoop") 将用户设置成指定的用户(我这里设置的是hadoop)
这里注意的是System.setProperty("HADOOP_USER_NAME", "hadoop") 这一行代码必须要在sparksession创建之前设置,不然不生效
System.setProperty("HADOOP_USER_NAME", "hadoop")
val session = SparkSession
.builder().enableHiveSupport().master("local[*]").getOrCreate()
编写代码遇到的问题
1、替换变量
我在插入动态分区的时候,需要将partition的时间动态的传入进去,这个就需要用到scala的语法 s前缀,它可以来进行变量替换
比如
def test() = {
val word = "hello"
println(s"$word, world")
}
2、sql整体都在一行,让sql变成多行方便查看
这时候使用stripMargin 来解决问题 类似于这种形式
var result=spark.sql(
s"""
|insert into ceshi.fenqu partition (p=$p)
|select ccard,cname,ename,phone,email,address from moniaaa
""".stripMargin)
网友评论