美文网首页Spark & FlinkFlink1.13
Flink SQL Connector(二)- JDBC & H

Flink SQL Connector(二)- JDBC & H

作者: Alex90 | 来源:发表于2021-06-16 14:17 被阅读0次

    JDBC

    更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/

    Source:Bounded

    Sink:Batch & Streaming Append / Upsert Mode

    LookupSource: SyncMode

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_2.11</artifactId>
      <version>1.13.0</version>
    </dependency>
    

    在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:

    Driver Group Id Artifact Id
    MySQL mysql mysql-connector-java
    PostgreSQL org.postgresql postgresql
    Derby org.apache.derby derby

    JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。

    如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。

    Create SQL

    -- 在 Flink SQL 中注册一张 MySQL 表 'users'
    CREATE TABLE MyUserTable (
      id BIGINT,
      name STRING,
      age INT,
      status BOOLEAN,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:mysql://localhost:3306/mydatabase',
       'table-name' = 'users'
    );
    
    -- 从另一张表 "T" 将数据写入到 JDBC 表中
    INSERT INTO MyUserTable
    SELECT id, name, age, status FROM T;
    
    -- 查看 JDBC 表中的数据
    SELECT id, name, age, status FROM MyUserTable;
    
    -- JDBC 表在时态表关联中作为维表
    SELECT * FROM myTopic
    LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
    ON myTopic.key = MyUserTable.id;
    

    Connector Options

    参数 是否必填 默认值 类型 描述
    connector 必填 (none) String 指定使用什么类型的连接器,这里应该是 'jdbc'。
    url 必填 (none) String JDBC 数据库 url。
    table-name 必填 (none) String 连接到 JDBC 表的名称。
    driver 可选 (none) String 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
    username 可选 (none) String JDBC 用户名。 如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。
    password 可选 (none) String JDBC 密码。
    connection.max-retry-timeout 可选 60s Duration 最大重试超时时间,以秒为单位且不应该小于 1 秒。
    scan.partition.column 可选 (none) String 用于将输入进行分区的列名。
    scan.partition.num 可选 (none) Integer 分区数。
    scan.partition.lower-bound 可选 (none) Integer 第一个分区的最小值。
    scan.partition.upper-bound 可选 (none) Integer 最后一个分区的最大值。
    scan.fetch-size 可选 0 Integer 每次循环读取时应该从数据库中获取的行数。 如果指定的值为 '0',则该配置项会被忽略。
    scan.auto-commit 可选 true Boolean 在 JDBC 驱动程序上设置 auto-commit 标志,决定了每个语句是否在事务中自动提交。 有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。
    lookup.cache.max-rows 可选 (none) Integer lookup cache 的最大行数,若超过该值,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。
    lookup.cache.ttl 可选 (none) Duration lookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。
    lookup.max-retries 可选 3 Integer 查询数据库失败的最大重试时间。
    sink.buffer-flush.max-rows 可选 100 Integer flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。
    sink.buffer-flush.interval 可选 1s Duration flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。
    sink.max-retries 可选 3 Integer 写入记录到数据库失败后的最大重试次数。
    sink.parallelism 可选 (none) Integer 用于定义 JDBC sink 算子的并行度。 默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。

    特性

    Key 处理

    当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

    在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。

    在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。

    分区扫描

    为了加速读取数据,创建并行 Source task,Flink 为 JDBC table 提供了分区扫描的特性。

    如果下述分区扫描参数中的任一项被指定,则所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。

    • scan.partition.column:输入用于进行分区的列名,必须是相关表中的数字、日期或时间戳列。
    • scan.partition.num:分区数(并行度)。
    • scan.partition.lower-bound:第一个分区的最小值(决定分区的起始位置和过滤表中的数据)。
    • scan.partition.upper-bound:最后一个分区的最大值(决定分区的起始位置和过滤表中的数据)。

    Lookup Cache

    JDBC Connector 可以用在时态表关联中作为一个 lookup source (维表、查找表),当前只支持同步的查找模式。

    默认情况下,lookup cache 是未启用的,可以设置 lookup.cache.max-rows 和 lookup.cache.ttl 参数来启用。lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。

    默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。

    幂等写入

    如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,这种方式确保了幂等性。

    如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,有助于避免违反数据库主键约束和产生重复数据。

    除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是用户期待的。

    由于 upsert 没有标准的语法,因此下表描述了不同数据库的 DML 语法:

    Database Upsert Grammar
    MySQL INSERT .. ON DUPLICATE KEY UPDATE ..
    PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..

    Hive

    更详细信息参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/

    Source:Bounded

    Sink:Batch & Streaming Append / Upsert Mode

    LookupSource: SyncMode

    Flink 与 Hive 的集成包含两个层面。

    一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过 HiveCatalog 将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用 HiveCatalog 将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

    二是利用 Flink 来读写 Hive 的表。HiveCatalog 的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 仓库。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

    官方强烈建议用户使用 Blink planner 与 Hive 集成。

    Flink 支持以下 Hive 版本。

    1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)

    2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)

    3.1.x(3.1.0、3.1.1、3.1.2)

    某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

    • Hive 内置函数在使用 Hive-1.2.0 及更高版本时支持。
    • Column 约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
    • 更改表的统计信息,在使用 Hive-1.2.0 及更高版本时支持。
    • DATE 列统计信息,在使用 Hive-1.2.0 及更高版时支持。
    • 使用 Hive-2.0.x 版本时不支持写入 ORC 表。

    依赖项

    与 Hive 集成,需要在 Flink 下的 /lib/ 目录中添加一些额外的依赖包,以便通过 Table API 或 SQL Client 与 Hive 进行交互。或者,可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的 -C 或 -l 选项将它们添加到 classpath 中。

    Apache Hive 是基于 Hadoop 之上构建的,需要设置 Hadoop 环境变量

    export HADOOP_CLASSPATH=`hadoop classpath`
    

    有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar 包。可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包(如果使用的 Hive 版本没有在下方列出,则这种方法会更适合)。

    建议优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足需求时,再考虑使用分开添加 jar 包的方式。

    使用 Flink 提供的 Hive jar

    下表列出了所有可用的 Hive jar。可以选择一个并放在 Flink 的 /lib/ 目录中。

    Metastore version Maven dependency
    1.0.0 - 1.2.2 flink-sql-connector-hive-1.2.2
    2.0.0 - 2.2.0 flink-sql-connector-hive-2.2.0
    2.3.0 - 2.3.6 flink-sql-connector-hive-2.3.6
    3.0.0 - 3.1.2 flink-sql-connector-hive-3.1.2

    Maven 依赖

    构建应用程序,则需要在 mvn 文件中添加以下依赖项。 应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们(scope: provide)。

    <!-- Flink Dependency -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive{{ site.scala_version_suffix }}</artifactId>
      <version>{{site.version}}</version>
      <scope>provided</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge{{ site.scala_version_suffix }}</artifactId>
      <version>{{site.version}}</version>
      <scope>provided</scope>
    </dependency>
    
    <!-- Hive Dependency -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
        <scope>provided</scope>
    </dependency>
    

    连接到Hive

    通过 TableEnvironment 或者 YAML 配置,连接到现有的 Hive 集群。

    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);
    
    String name            = "myhive";
    String defaultDatabase = "mydatabase";
    String hiveConfDir     = "/opt/hive-conf";
    
    HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    tableEnv.registerCatalog("myhive", hive);
    
    // set the HiveCatalog as the current catalog of the session
    tableEnv.useCatalog("myhive");
    

    Hive Catalog

    在 Hadoop 生态系统中,Hive Metastore 多年来已经演变成事实上的元数据中心。

    对于同时使用 Hive 和 Flink 的用户,HiveCatalog 允许使用 Hive Metastore 管理 Flink 的元数据。

    对于只适用 Flink 的用户,HiveCatalog 是 Flink 唯一支持的开箱即用的持久化 Catalog。

    如果没有持久化 Catalog,用户在创建 Kafka Table 时需要重复在每个应用中使用 Flink Create SQL 配置元数据。HiveCatalog 实现用户只创建一次表和元对象,并在以后的会话中方便地引用和管理它们。

    使用 HiveCatalog

    HiveCatalog可用于处理两种类型的表:Hive兼容表和通用表。 就存储层中的元数据和数据而言,兼容 Hive 的表是以兼容 Hive 的方式存储的表。 因此,通过 Flink 创建的 Hive 兼容表可以从 Hive 端查询。

    另一方面,通用表特定于 Flink。 使用 HiveCatalog 创建通用表时,只是使用 HMS 来保留元数据。虽然这些表格对 Hive 可见,但 Hive 不太可能能够理解元数据。因此,在 Hive 中使用此类表会导致未定义的行为。

    下面将通过一个简单的例子,演示将 Kafa 作为数据源,并将元数据保存到 Hive metastore 中,使用 Flink SQL 直接读取 Kafka。

    step1. 确保 Hive Metastore 可用

    安装 Hive 环境,设置 Hive 的 Metastore 配置(hive-site.xml 文件)。使用 Hive CLI 测试

    hive> show databases;
    OK
    default
    Time taken: 0.032 seconds, Fetched: 1 row(s)
    
    hive> show tables;
    OK
    Time taken: 0.028 seconds, Fetched: 0 row(s)
    

    step2. 配置 Flink 集群和 SQL CLI

    将 Hive 依赖 jar 添加到 Flink 安装目录下的 /lib 目录中。修改 SQL CLI 的 YARM 配置(../conf/sql-cli-defaults.yaml)

    execution:
        planner: blink
        type: streaming
        ...
        current-catalog: myhive  # set the HiveCatalog as the current catalog of the session
        current-database: mydatabase
        
    catalogs:
       - name: myhive
         type: hive
         hive-conf-dir: /opt/hive-conf  # contains hive-site.xml
    

    step3. 确保 Kafka 集群可用

    step4. 启动 SQL Client,使用 Flink SQL DDL 创建 kafka table

    Flink SQL> CREATE TABLE mykafka (name String, age Int) WITH (
       'connector.type' = 'kafka',
       'connector.version' = 'universal',
       'connector.topic' = 'test',
       'connector.properties.bootstrap.servers' = 'localhost:9092',
       'format.type' = 'csv',
       'update-mode' = 'append'
    );
    [INFO] Table has been created.
    
    Flink SQL> DESCRIBE mykafka;
    root
     |-- name: STRING
     |-- age: INT
    

    使用 Hive CLI 测试是否可以使用该表

    hive> show tables;
    OK
    mykafka
    Time taken: 0.038 seconds, Fetched: 1 row(s)
    
    hive> describe formatted mykafka;
    OK
    # col_name              data_type               comment
    ...
    
    # Detailed Table Information
    Database:               default
    Table Type:             MANAGED_TABLE
    Table Parameters:
        flink.connector.properties.bootstrap.servers    localhost:9092
        flink.connector.topic   test
        flink.connector.type    kafka
        flink.connector.version universal
        flink.format.type       csv
        flink.generic.table.schema.0.data-type  VARCHAR(2147483647)
        flink.generic.table.schema.0.name   name
        flink.generic.table.schema.1.data-type  INT
        flink.generic.table.schema.1.name   age
        flink.update-mode       append
        is_generic              true
    

    step5. 运行 Flink SQL 查询该表

    Flink SQL> select * from mykafka;
                 SQL Query Result (Table)
     Refresh: 1 s    Page: Last of 1     
    
            name                       age
             tom                        15
            john                        21
           kitty                        30
             amy                        24
           kaiky                       18
    

    Hive Dialect

    从 1.11.0 开始,在使用 Hive Dialect 时,Flink 允许用户用 Hive 语法来编写 SQL 语句。旨在改善与 Hive 的互操作性,并减少在 Flink 和 Hive 之间切换来执行不同语句的情况。

    使用 Hive Dialect

    Flink 目前支持两种 SQL Dialect:default 和 hive。需要先切换到 Hive 方言,然后才能使用 Hive 语法编写。下面介绍如何使用 SQL 客户端和 Table API 设置方言。可以为执行的每个语句动态切换方言。无需重新启动会话即可使用其他方言。

    SQL Client

    以通过 table.sql-dialect 属性指定。修改 SQL CLI 的 YARM 配置(../conf/sql-cli-defaults.yaml)

    execution:
      planner: blink
      type: batch
      result-mode: table
    
    configuration:
      table.sql-dialect: hive
    

    可以在 SQL 客户端启动后设置方言。

    Flink SQL> set table.sql-dialect=hive; -- to use hive dialect
    [INFO] Session property has been set.
    
    Flink SQL> set table.sql-dialect=default; -- to use default dialect
    [INFO] Session property has been set.
    

    Table API

    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
    TableEnvironment tableEnv = TableEnvironment.create(settings);
    
    // to use hive dialect
    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    // to use default dialect
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
    

    注意事项

    以下是使用 Hive 方言的一些注意事项:

    • Hive 方言只能用于操作 Hive 对象,并要求当前 Catalog 是一个 HiveCatalog。
    • Hive 方言只支持 db.table 这种两级的标识符,不支持带有 Catalog 名字的标识符。
    • 虽然所有 Hive 版本支持相同的语法,但是一些特定的功能是否可用仍取决于使用的 Hive 版本。
    • 执行 DML 和 DQL 时应该使用 HiveModule 。

    Hive Read & Write

    使用 HiveCatalog,Flink 可以统一 Hive 表的批处理和流处理。

    Reading

    Flink 支持以批处理和流处理模式从 Hive 读取数据。当作为批处理应用程序运行时,Flink 会处理的执行查询时的表数据。流式读取将持续监视表的更新,在新数据可用时以增量方式即时获取。默认情况,Flink 视为读取有界的表。

    流式读取支持使用分区和非分区表。对于分区表,Flink 将监视新分区的生成,并在可用时以增量方式读取。对于非分区表,Flink 将监视文件夹中新文件的生成,并以增量方式读取新文件。

    使用 SQL Hints 可以应用以下配置(无需修改 Hive Metastore 中的定义)

    SELECT * 
    FROM hive_table 
    /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
    
    Key Default Type Description
    streaming-source.enable false Boolean 是否开启流式读取 Hive 数据。默认是批处理模式。
    streaming-source.partition.include all 可选值['all', 'latest'] 设置要读取的分区,默认读所有分区。 可选项:latest:只读取最新分区数据all:读取全量分区数据latest 只能用在 temporal join 中,用于读取最新分区作为维表。
    streaming-source.monitor-interval None Duration 监听新分区生成的时间。 Hive streaming reading 默认间隔是1分钟。Hive streaming temporal join 默认间隔是1小时。 目前的实现是每个 TaskManager 都会查询 Hive Metastore,高频的查询可能会对 Hive Metastore 产生过大的压力。
    streaming-source.partition-order partition-name 可选值['partition-name', 'create-time', 'partition-time'] 加载分区顺序 可选项:partition-name:使用默认分区名称顺序加载最新分区create-time:使用分区文件创建时间顺序partition-time:使用分区时间顺序
    streaming-source.consume-start-offset None String 流式读取 Hive 表的起始偏移量。 取决于设置的分区顺序对于 create-time 和 partition-time,应该是时间字符串(yyyy-[m]m-[d]d [hh:mm:ss])对于 partition-time,应该是分区名称字符串(例如pt_year=2020/pt_mon=10/pt_day=01)

    需要注意的

    • 监视策略会扫描当前位置路径中的所有目录/文件。太多的分区可能会导致性能下降。
    • 非分区表的流式读取要求将每个文件的生产者以原子方式写入目标目录。
    • 对分区表的流式读取要求每个分区都应以原子方式添加到 Hive Metastore 视图中。
    • 流读取不支持 Flink DDL 中的水印语法。这些表不能使用窗口运算。

    Reading Hive Views

    Flink 可以从 Hive 定义的视图中读取,有以下限制:

    1. 在查询视图之前,必须将 Hive catalog 设置为当前目录。可以通过 Table API 的 tableEnv.useCatalog() 或 SQL 语法 USE CATALOG 来完成。
    2. Hive 和 Flink 有不同的 SQL 语法。确保视图的查询与 Flink 语法兼容。

    Vectorized Optimization upon Read

    当满足以下条件时,Flink 将自动使用列式读取优化:

    • Table Format:ORC 或 Parquet
    • Column 没有复杂字段(List、Map、Struct、Union)

    默认情况下是开启的,可以通过下面的配置关闭

    table.exec.hive.fallback-mapred-reader=true
    

    Source Parallelism Inference

    默认情况下,Flink 将根据文件数和每个文件中的块数推断出 Hive Reader 的最佳并行度。

    Flink 也支持灵活地配置并行推理策略。可以在 TableConfig 中配置以下参数(请注意,这些参数会影响 Job 的所有 Source):

    Key Default Type Description
    table.exec.hive.infer-source-parallelism true Boolean 如果为 true,根据文件数和文件块推断并行度。如果为 false,则由 config 设置源的并行度。
    table.exec.hive.infer-source-parallelism.max 1000 Integer 设置 Source 的最大推断并行度。

    Temporal Table Join

    可以将 Hive 表用作时态表(Temporal Table),流可以通过时态联接(Temporal Join)关联 Hive 表。

    Flink 支持处理时间时态连接 Hive 表,处理时间时态连接总是关联最新版本的时态表。

    Flink 支持 Hive 分区表和非分区表的时态连接,对于分区表,Flink 支持自动跟踪 Hive 表的最新分区。

    Flink 还不支持事件时间临时连接 Hive 表。

    Temporal Join The Latest Partition

    对于一个随时间变化的分区表,可以把读作一个无界流,每个分区可以作为时态表的一个版本(如果每个分区都包含一个版本的完整数据)。表的最新版本保留 Hive 表的分区数据。

    Flink 支持在处理时间时态连接时自动跟踪时态表的最新分区(版本),最新分区(版本)由 streaming source.partition-order 选项定义。

    此功能仅在 Flink 流处理模式下支持。

    下面代码演示了一个经典的业务处理,维度表来自 Hive,每天由批处理作业更新一次,kafka 流来自实时在线业务数据或日志,需要与维度表连接以丰富流。

    假设 Hive 表中的数据每天更新,每天都包含最新的完整维度数据。

    SET table.sql-dialect=hive;
    CREATE TABLE dimension_table (
      product_id STRING,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP(3),
      update_user STRING,
      ...
    ) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
      -- 使用默认分区名顺序(partition-name),每12小时加载一次最新的分区(推荐用法)
      'streaming-source.enable' = 'true',
      'streaming-source.partition.include' = 'latest',
      'streaming-source.monitor-interval' = '12 h',
      'streaming-source.partition-order' = 'partition-name', 
    
      -- 使用分区文件创建时间顺序,每12小时加载一次最新的分区
      'streaming-source.enable' = 'true',
      'streaming-source.partition.include' = 'latest',
      'streaming-source.partition-order' = 'create-time',
      'streaming-source.monitor-interval' = '12 h'
    
      -- 使用分区时间顺序,每12小时加载一次最新的分区
      'streaming-source.enable' = 'true',
      'streaming-source.partition.include' = 'latest',
      'streaming-source.monitor-interval' = '12 h',
      'streaming-source.partition-order' = 'partition-time',
      'partition.time-extractor.kind' = 'default',
      'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' 
    );
    

    创建 Kafka Table

    SET table.sql-dialect=default;
    CREATE TABLE orders_table (
      order_id STRING,
      order_amount DOUBLE,
      product_id STRING,
      log_ts TIMESTAMP(3),
      proctime as PROCTIME()
    ) WITH (...);
    

    实现 Temporal Join 查询丰富流

    SELECT * FROM orders_table AS o 
    JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
    ON o.product_id = dim.product_id;
    

    Temporal Join The Latest Table

    对于 Hive 表,可以把读作一个有界流。在这种情况下,Hive 表只能在查询时跟踪其最新版本。表的最新版本保留 Hive 表的所有数据。

    在对最新的 Hive 表执行时态联接时,Hive 表将被缓存在 Slot 内存中,并且流中的每条记录都与表联接进行匹配项。可以使用以下属性配置 Hive 表缓存的 TTL。缓存过期后,将再次扫描 Hive 表以加载最新数据。

    Key Default Type Description
    lookup.join.cache.ttl 60 min Duration 缓存生存时间,默认60分钟。 配置只有再查找有界 Hive Table 时有效。

    下面的演示将 hive 表的所有数据作为时态表加载。

    -- 假设 Hive 表中的数据被批处理以 overwrite 的形式生成。
    SET table.sql-dialect=hive;
    CREATE TABLE dimension_table (
      product_id STRING,
      product_name STRING,
      unit_price DECIMAL(10, 4),
      pv_count BIGINT,
      like_count BIGINT,
      comment_count BIGINT,
      update_time TIMESTAMP(3),
      update_user STRING,
      ...
    ) TBLPROPERTIES (
      'streaming-source.enable' = 'false',           
      'streaming-source.partition.include' = 'all',  
      'lookup.join.cache.ttl' = '12 h'
    );
    
    SELECT * FROM orders_table AS o 
    JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
    ON o.product_id = dim.product_id;
    

    需要注意:

    • 每个 join 子任务都需要保留一份缓存。确保 Hive 表数据可以放入 TM Slot 的内存中。
    • 建议为 streaming source.monitor-interval 或 lookup.join.cache.ttl 设置一个相对较大的值。否则,表过于频繁地更新和重新加载,影响性能。
    • 一旦缓存需要刷新,就会重新加载整个 Hive 表。没有办法区分新旧数据。

    Writing

    Flink 支持以批处理和流处理模式向 Hive 写入数据。

    当作为批处理运行时,Flink 将只在作业完成时才向 Hive 表写入这些记录。既支持 Append,也支持 Overwrite。

    # ------ Append 追加数据
    Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
    
    # ------ Overwrite 覆盖数据
    Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
    

    数据也可以插入到特定的分区中。

    # ------ 插入静态分区
    Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') 
    SELECT 'Tom', 25;
    
    # ------ 插入动态分区
    Flink SQL> INSERT OVERWRITE myparttable 
    SELECT 'Tom', 25, 'type_1', '2019-08-08';
    
    # ------
    Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') 
    SELECT 'Tom', 25, '2019-08-08';
    

    流式写入不断以增量方式向 Hive 添加新数据,提交使其可见。用户通过多个属性控制何时/如何触发提交。流式写入不支持覆盖插入。

    下面的示例演示将 Kafka 中的数据写入有分区的 Hive 表,并运行批处理查询将数据读出。

    SET table.sql-dialect=hive;
    CREATE TABLE hive_table (
      user_id STRING,
      order_amount DOUBLE
    ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
      'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
      'sink.partition-commit.trigger'='partition-time',
      'sink.partition-commit.delay'='1 h',
      'sink.partition-commit.policy.kind'='metastore,success-file'
    );
    
    SET table.sql-dialect=default;
    CREATE TABLE kafka_table (
      user_id STRING,
      order_amount DOUBLE,
      log_ts TIMESTAMP(3),
      -- Watermark 定义在 TIMESTAMP 类型字段上
      WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND 
    ) WITH (...);
    
    -- 查询 kafka 插入 Hive
    INSERT INTO TABLE hive_table 
    SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
    FROM kafka_table;
    
    -- 查询 Hive table
    SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
    

    如果 Watermark 定义在 TIMESTAMP_LTZ 类型字段上,sink.partition-commit.watermark-time-zone 需要配置时区(如果使用 partition-time 作为提交策略)

    SET table.sql-dialect=hive;
    CREATE TABLE hive_table (
      user_id STRING,
      order_amount DOUBLE
    ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
      'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
      'sink.partition-commit.trigger'='partition-time',
      'sink.partition-commit.delay'='1 h',
      -- 配置时区
      'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -
      'sink.partition-commit.policy.kind'='metastore,success-file'
    );
    
    SET table.sql-dialect=default;
    CREATE TABLE kafka_table (
      user_id STRING,
      order_amount DOUBLE,
      ts BIGINT, 
      ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
      WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND 
    ) WITH (...);
    

    Formats

    Flink 支持 Hive 使用以下数据格式:Text、CSV、SequenceFile、ORC、Parquet

    Hive Functions

    Built-in Function

    HiveModule 提供了 Hive 可以使用的内置函数

    tableEnv.loadModue("myhive", new HiveModule("2.3.4"));
    

    User Defined Function

    用户可以在 Flink 中使用 Hive UDF,支持的 UDF 类型包括:

    • UDF
    • Generic UDF
    • Generic UDTF
    • UDAF
    • GenericUDAFResolver2

    Hive 的 UDF 和 GenericUDF 自动转换为 Flink 的 ScalarFunction,Hive 的 GenericUDTF 自动转换为 Flink 的 TableFunction,Hive 的 UDAF 和 GenericUDAFResolver2 转换为 Flink 的 AggregateFunction。

    要使用 Hive UDF,用户必须:

    • 设置 HiveCatalog 包含该函数作为会话的当前目录
    • 在 Flink 的类路径中包含包含该函数的jar
    • 使用 Blink Planner

    创建一个 UDF,使用时注册名 “myudf”

    public class TestHiveSimpleUDF extends UDF {
    
        public IntWritable evaluate(IntWritable i) {
            return new IntWritable(i.get());
        }
    
        public Text evaluate(Text text) {
            return new Text(text.toString());
        }
    }
    

    创建一个 Generic UDF,使用时注册名 “mygenericudf”

    public class TestHiveGenericUDF extends GenericUDF {
    
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            // ...
        }
    
        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException {
            return arguments[0].get();
        }
    
        @Override
        public String getDisplayString(String[] children) {
            return "TestHiveGenericUDF";
        }
    }
    

    创建一个 Generic UDTF,使用时注册名 “mygenericudtf”

    public class TestHiveUDTF extends GenericUDTF {
    
        @Override
        public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
                 // ...
        }
    
        @Override
        public void process(Object[] args) throws HiveException {
            String str = (String) args[0];
            for (String s : str.split(",")) {
                forward(s);
                forward(s);
            }
        }
    
        @Override
        public void close() {
        }
    }
    

    在 Hive CLI 中查看

    hive> show functions;
    OK
    mygenericudf
    myudf
    myudtf
    

    在 Flink SQL 中使用

    select 
        mygenericudf(myudf(name), 1) as a, 
        mygenericudf(myudf(age), 1) as b, 
        s 
    from mysourcetable, 
    lateral table(myudtf(name, 1)) as T(s);
    

    相关文章

      网友评论

        本文标题:Flink SQL Connector(二)- JDBC & H

        本文链接:https://www.haomeiwen.com/subject/zzyoeltx.html