美文网首页
在spark structuredStreaming中使用hiv

在spark structuredStreaming中使用hiv

作者: JX907 | 来源:发表于2021-02-09 12:23 被阅读0次

    <meta charset="utf-8">

    基于structuredStreaming的流任务中需要将hive小表(一般是维度表)参与到流计算的处理中,以下研究是证明spark读取hive数据的及时性,即hive数据变动后在sparksql中能正确读取到最新的hive数据,同时考虑性能,尝试将hive表数据做缓存,并且维护缓存的更新。

    1、不做缓存

    代码:

    while (true) {
              sc.catalog().refreshTable("beian.analyze_ip1");
              Dataset<Row> dataset1 = sc.sql("select * from beian.analyze_ip1");
              //  dataset1.cache();
              LOG.error("总数:" +  dataset1.toJavaRDD().collect().size());
              try {
                  Thread.sleep(10000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
    

    操作步骤:

    1、清空表beian.analyze_ip1

    2、启动saprk程序;

    3、当“总数:0”出现后在hue执行写入表的sql,使表中有数据;

    结果:

    1.png

    注意:必须使用sc.catalog().refreshTable()方法刷新元数据,因为spark默认在启动时缓存了hive的元数据,如果在运行时hive表的数据有了变动,比如增加了分区,那么必须代码显示刷新获取最新的元数据。

    2、加缓存

    在不加缓存情况下spark每次都要去读取hive文件,流任务的速度远远大于hive文件读取速度,因此需要加缓存,加了缓存后还需要定时维护缓存中的内容。

    代码:

    while (true) {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        dataset1.unpersist(true);
        sc.catalog().refreshTable("beian.analyze_ip1");
        Dataset<Row> dataset2 = sc.sql("select * from beian.analyze_ip1");
        dataset1 = dataset2;
        dataset1.persist(StorageLevel.MEMORY_ONLY());
        LOG.error("总数:" +  dataset1.toJavaRDD().collect().size());
    }
    

    注意:创建dataset2再赋值给dataset1是必须的; 上述while中的内容可以写到定时器中定时执行。

    操作步骤同上

    结果:

    2.png

    相关文章

      网友评论

          本文标题:在spark structuredStreaming中使用hiv

          本文链接:https://www.haomeiwen.com/subject/fhaqxltx.html