数据湖—Delta Lake -之基础操作

数据湖—Delta Lake -之基础操作

作者: wudl | 来源:发表于2021-06-09 23:58 被阅读0次


    1.1 官网




    1.2 特点:

    3.高性能分析能力 -- 借助于Spark、MR、SparkSQL等高性能分析计算引擎,可以对海量的数据进行分析。

    1.3 数据湖,数据仓库, 数据集市 的对比

    比较 数据仓库 数据集市 数据湖
    应用范围 全公司 部门或者小组 全公司
    数据类型 结构化数据处理 结构化数据处理 任意格式数据处理
    存储规模 大量 中等规模(小型数仓) 海量
    数据应用 维度建模,指标分析 小范围的数据分析 海量任意格式分析,不限应用类型

    1.3 写时的模式


    1.4 读时模式


    1.5 特点:

    1.轻松的收集数据(读时模式):数据湖与数据仓库的一大区别就是,Schema On Read,即在使用数据时才需要Schema信息;而数据仓库是Schema On Write,即在存储数据时就需要设计好Schema。这样,由于对数据写入没有限制,数据湖可以更容易的收集数据。

    1.6 数据湖的要求

    5.原有格式存储:数据湖我们定义为 所有数据的原始数据集中存储库,那么存储进入数据湖的数据就是未经修饰的、原始的数据


        1. 分离数据 和 业务
        3.Lambda架构 VS Kappa架构 VS IOTA架构 - 
            4.1安全 (Kerberos)

    2.Data Lake 的基本操作

    2.1 Data Lake 的特点

    1.  ACID 事务控制 :Delta Lake将ACID事务带入您的数据湖。它提供了可序列化性,最强的隔离级别。
    2.  可伸缩的元数据处理: Delta Lake可以轻松处理具有数十亿个分区和文件的PB级表
    4.  数据版本控制 : Delta Lake提供了数据快照,使开发人员可以访问和还原到较早版本的数据以进行审核,回滚或重现实验。
    5.  开放的数据格式 :Delta Lake中的所有数据均以Apache Parquet格式存储,从而使Delta Lake能够利用Parquet固有的高效压缩和编码方案。
    6.  统一的批处理和流处理的source 和 sink : Delta Lake中的表既是批处理表,又是流计算的source 和 sink。
    7.  Schema执行: Delta Lake提供了指定和执行模式的功能。这有助于确保数据类型正确并且存在必需的列,从而防止不良数据导致数据损坏.
    8.  Schema演化: 大数据在不断变化。 Delta Lake使您可以更改可自动应用的表模式,而无需繁琐的DDL
    9.  审核历史记录 :Delta Lake事务日志记录有关数据所做的每项更改的详细信息,从而提供对更改的完整审核跟踪
    10. 更新和删除 : Delta Lake支持Scala / Java API进行合并,更新和删除数据集。
    10.100%和 Apache Spark 的API兼容    : 和spark 完全兼容。

    2.2 Data lake 的操作: Spark Scala Shell -- 要求只是使用的Spark版本:>=2.4.2

    bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0


    [root@master01 spark-2.4.7-bin-hadoop2.7]# bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0
    Ivy Default Cache set to: /root/.ivy2/cache
    The jars for the packages stored in: /root/.ivy2/jars
    :: loading settings :: url = jar:file:/opt/module/spark-2.4.7-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    io.delta#delta-core_2.11 added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-811cb329-3b4b-4a62-ab7a-d2287a1901dc;1.0
        confs: [default]
        found io.delta#delta-core_2.11;0.5.0 in central
        found org.antlr#antlr4;4.7 in central
        found org.antlr#antlr4-runtime;4.7 in central
        found org.antlr#antlr-runtime;3.5.2 in central
        found org.antlr#ST4;4.0.8 in central
        found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
        found org.glassfish#javax.json;1.0.4 in central
        found com.ibm.icu#icu4j;58.2 in central
    :: resolution report :: resolve 376ms :: artifacts dl 6ms
        :: modules in use:
        com.ibm.icu#icu4j;58.2 from central in [default]
        io.delta#delta-core_2.11;0.5.0 from central in [default]
        org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
        org.antlr#ST4;4.0.8 from central in [default]
        org.antlr#antlr-runtime;3.5.2 from central in [default]
        org.antlr#antlr4;4.7 from central in [default]
        org.antlr#antlr4-runtime;4.7 from central in [default]
        org.glassfish#javax.json;1.0.4 from central in [default]
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        |      default     |   8   |   0   |   0   |   0   ||   8   |   0   |
    :: retrieving :: org.apache.spark#spark-submit-parent-811cb329-3b4b-4a62-ab7a-d2287a1901dc
        confs: [default]
        0 artifacts copied, 8 already retrieved (0kB/9ms)
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    21/06/09 23:27:26 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    Spark context Web UI available at http://master01.pxx.com:4041
    Spark context available as 'sc' (master = local[*], app id = local-1623252446880).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.4.7
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala> val data = spark.range(0, 5)

    2.2 官网命令:

    bin/spark-shell --packages io.delta:delta-core_2.12:1.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
    其实可以自行bin/spark-shell --packages io.delta:delta-core_2.12:1.0.0

    2.3 按照官网命令走

    1.创建表, 并且读取表

    scala> val data = spark.range(0, 5)
    data: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    scala> data.write.format("delta").save("/tmp/delta-table02")                                                                         
    scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
    | id|
    |  2|
    |  0|
    |  4|
    |  3|
    |  1|
    1. 更新操作
    scala> val data01 = spark.range(5,10)
    data01: org.apache.spark.sql.Dataset[Long] = [id: bigint]
    scala> data01.write.format("delta").mode("overwrite").save("/tmp/delta-table02")                                                                             
    scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
    | id|
    |  8|
    |  7|
    |  5|
    |  6|
    |  9|

    3.Delta Lake提供了编程api,用于有条件地更新、删除和合并(upsert)数据到表中

    scala> import io.delta.tables._
    import io.delta.tables._
    scala> import org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions._
    scala> val deltaTable = DeltaTable.forPath("/tmp/delta-table02")
    deltaTable: io.delta.tables.DeltaTable = io.delta.tables.DeltaTable@5e88e2e7
    // 通过将每个偶数值加100来更新每个偶数值
    scala> deltaTable.update(condition=expr("id % 2 ==0"), set = Map("id"->expr("id+100")))
    scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
    | id|
    |  7|
    |  5|
    |  9|
    // 删除偶数
    scala> deltaTable.delete(condition = expr("id % 2 ==0"))
    scala> spark.read.format("delta").load("/tmp/delta-table02").toDF.show()
    | id|
    |  7|
    |  5|
    |  9|
    scala> val newData = spark.range(0,20).toDF
    newData: org.apache.spark.sql.DataFrame = [id: bigint]
    // 合并新数据
     deltaTable.as("oldData").merge(newData.as("newData"),"oldData.id=newData.id").whenMatched.update(Map("id" -> col("newData.id"))).whenNotMatched.insert(Map("id" ->col("newData.id"))).excute()
    scala> deltaTable.as("oldData").merge(newData.as("newData"),"oldData.id=newData.id").whenMatched.update(Map("id" -> col("newData.id"))).whenNotMatched.insert(Map("id" ->col("newData.id"))).execute()
    [Stage 86:===================================>                 (135 + 51) / 200]21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 96.54% for 7 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 84.47% for 8 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 75.08% for 9 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 67.58% for 10 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 61.43% for 11 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 56.31% for 12 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 51.98% for 13 writers
    [Stage 86:===============================================>     (180 + 20) / 200]21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 48.27% for 14 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 45.05% for 15 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 42.24% for 16 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 39.75% for 17 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 37.54% for 18 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 35.57% for 19 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 33.79% for 20 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 35.57% for 19 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 37.54% for 18 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 39.75% for 17 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 42.24% for 16 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 45.05% for 15 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 48.27% for 14 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 51.98% for 13 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 56.31% for 12 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 61.43% for 11 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 67.58% for 10 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 75.08% for 9 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 84.47% for 8 writers
    21/06/09 23:51:11 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
    Scaling row group sizes to 96.54% for 7 writers
    scala> deltaTable.toDF.show()
    | id|
    |  0|
    |  2|
    |  6|
    |  1|
    | 10|
    | 11|
    | 15|
    | 12|
    |  4|
    | 19|
    | 14|
    |  5|
    |  9|
    | 13|
    |  8|
    | 18|
    | 16|
    |  7|
    |  3|
    | 17|



        本文标题:数据湖—Delta Lake -之基础操作
