<meta charset="utf-8">
基于structuredStreaming的流任务中需要将hive小表(一般是维度表)参与到流计算的处理中,以下研究是证明spark读取hive数据的及时性,即hive数据变动后在sparksql中能正确读取到最新的hive数据,同时考虑性能,尝试将hive表数据做缓存,并且维护缓存的更新。
1、不做缓存
代码:
while (true) {
sc.catalog().refreshTable("beian.analyze_ip1");
Dataset<Row> dataset1 = sc.sql("select * from beian.analyze_ip1");
// dataset1.cache();
LOG.error("总数:" + dataset1.toJavaRDD().collect().size());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
操作步骤:
1、清空表beian.analyze_ip1
2、启动saprk程序;
3、当“总数:0”出现后在hue执行写入表的sql,使表中有数据;
结果:
1.png注意:必须使用sc.catalog().refreshTable()方法刷新元数据,因为spark默认在启动时缓存了hive的元数据,如果在运行时hive表的数据有了变动,比如增加了分区,那么必须代码显示刷新获取最新的元数据。
2、加缓存
在不加缓存情况下spark每次都要去读取hive文件,流任务的速度远远大于hive文件读取速度,因此需要加缓存,加了缓存后还需要定时维护缓存中的内容。
代码:
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
dataset1.unpersist(true);
sc.catalog().refreshTable("beian.analyze_ip1");
Dataset<Row> dataset2 = sc.sql("select * from beian.analyze_ip1");
dataset1 = dataset2;
dataset1.persist(StorageLevel.MEMORY_ONLY());
LOG.error("总数:" + dataset1.toJavaRDD().collect().size());
}
注意:创建dataset2再赋值给dataset1是必须的; 上述while中的内容可以写到定时器中定时执行。
操作步骤同上
结果:
2.png
网友评论