美文网首页
在spark structuredStreaming中使用hiv

在spark structuredStreaming中使用hiv

作者: JX907 | 来源:发表于2021-02-09 12:23 被阅读0次

<meta charset="utf-8">

基于structuredStreaming的流任务中需要将hive小表(一般是维度表)参与到流计算的处理中,以下研究是证明spark读取hive数据的及时性,即hive数据变动后在sparksql中能正确读取到最新的hive数据,同时考虑性能,尝试将hive表数据做缓存,并且维护缓存的更新。

1、不做缓存

代码:

while (true) {
          sc.catalog().refreshTable("beian.analyze_ip1");
          Dataset<Row> dataset1 = sc.sql("select * from beian.analyze_ip1");
          //  dataset1.cache();
          LOG.error("总数:" +  dataset1.toJavaRDD().collect().size());
          try {
              Thread.sleep(10000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }

操作步骤:

1、清空表beian.analyze_ip1

2、启动saprk程序;

3、当“总数:0”出现后在hue执行写入表的sql,使表中有数据;

结果:

1.png

注意:必须使用sc.catalog().refreshTable()方法刷新元数据,因为spark默认在启动时缓存了hive的元数据,如果在运行时hive表的数据有了变动,比如增加了分区,那么必须代码显示刷新获取最新的元数据。

2、加缓存

在不加缓存情况下spark每次都要去读取hive文件,流任务的速度远远大于hive文件读取速度,因此需要加缓存,加了缓存后还需要定时维护缓存中的内容。

代码:

while (true) {
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    dataset1.unpersist(true);
    sc.catalog().refreshTable("beian.analyze_ip1");
    Dataset<Row> dataset2 = sc.sql("select * from beian.analyze_ip1");
    dataset1 = dataset2;
    dataset1.persist(StorageLevel.MEMORY_ONLY());
    LOG.error("总数:" +  dataset1.toJavaRDD().collect().size());
}

注意:创建dataset2再赋值给dataset1是必须的; 上述while中的内容可以写到定时器中定时执行。

操作步骤同上

结果:

2.png

相关文章

网友评论

      本文标题:在spark structuredStreaming中使用hiv

      本文链接:https://www.haomeiwen.com/subject/fhaqxltx.html