Flink 使用之操作 Hudi 表

作者: AlienPaul | 来源:发表于2021-10-14 18:01 被阅读0次

    Flink 使用介绍相关文档目录

    Flink 使用介绍相关文档目录

    前言

    因业务要求对采集来的数据进行统一存储,因此引入了Flink CDC - Hudi方案。Flink CDC在本人之前的博客已有介绍(参见Flink 使用之 MySQL CDC
    )。本篇重点介绍Flink SQL结合Hudi的使用方法。Hudi表使用Flink SQL操作,为了便于业务人员使用,我们为其提供Zeppelin,能够以可视化的方式编写并执行Flink作业,同时还可以图形化展示数据分析结果。

    软件版本

    我们使用的软件和版本如下所示:

    • Flink:1.12.2
    • Hudi:0.9
    • Zeppelin:0.10.0

    首先我们配置Flink Hudi环境。

    下载编译Hudi

    找一台已经安装了maven的服务器。执行:

    git clone https://github.com/apache/hudi.git
    

    源代码clone成功之后,切换分支到origin/release-0.9.0。接着执行编译命令:

    mvn clean package -DskipTests
    

    等待编译完成。

    编译完成之后,Flink hudi bundle的编译输出在hudi/packaging/hudi-flink-bundle/target,Flink SQL支持Hudi所需jar包就在这个目录,将其复制走备用。

    使用Flink SQL Client的方法执行Hudi SQL

    在这一步我们使用Flink on yarn的方式启动Flink SQL Client,然后通过它操作Hudi表。

    首先我们下载Flink 1.12.2并解压。

    wget https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz
    tar -zxvf flink-1.12.2-bin-scala_2.11.tgz
    

    然后配置HADOOP的环境变量:

    export HADOOP_CLASSPATH=`hadoop classpath`
    

    下面的操作需要使用具有HDFS读写权限的用户执行。

    编辑conf/flink-conf.yaml,修改如下内容:

    taskmanager.numberOfTaskSlots: 4
    

    然后修改conf/worker,如下所示:

    localhost
    localhost
    localhost
    localhost
    

    这样子配置,启动standlone集群时,会在本地启动4个TaskManager。

    接下来启动Flink SQL Client:

    ./sql-client.sh embedded -j /path/to/hudi-flink-bundle_xxxxxx.jar
    

    注意,-j后面是Hudi Flink bundle jar包。

    成功进入Flink SQL Client之后,我们执行插入测试数据的SQL:

    CREATE TABLE t1(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts TIMESTAMP(3),
      `partition` VARCHAR(20)
    )
    PARTITIONED BY (`partition`)
    WITH (
      'connector' = 'hudi',
      'path' = 'hdfs:///zy/hudi/',
      'table.type' = 'MERGE_ON_READ'
    );
    
    -- insert data using values
    INSERT INTO t1 VALUES
      ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
      ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
      ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
      ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
      ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
      ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
      ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
      ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
    

    稍等一段时间后执行如下SQL,检查插入的数据:

    select * from t1;
    

    如果能查询出刚才插入的数据,说明Flink Hudi运行环境配置无误。

    通过Zeppelin使用Flink Hudi

    Flink SQL Client可以让用户直接通过SQL方式创建流处理作业,不再需要编写Java/Scala代码,但是它仍然基于命令行,对业务人员不够友好。为了方便业务人员使用,我们引入了Zeppelin。Zeppelin提供了交互数据分析和可视化功能,易用性能够满足业务人员的需求。

    注意:Zeppelin,Flink和Hudi三者之间存在版本兼容问题。本人目前验证了Zeppelin 0.10.0,Flink 1.12.2和Hudi 0.9。试用其他版本过程遇到了些奇怪的问题。所以说其他版本组件请谨慎操作。

    下面我们开始部署和使用Zeppelin。

    安装和配置Zeppelin

    在之前安装Flink的服务器上,下载Zeppelin 0.10.0版本:

    wget https://dlcdn.apache.org/zeppelin/zeppelin-0.10.0/zeppelin-0.10.0-bin-all.tgz
    

    下载完毕后将其解压,由于Zeppelin必须使用Java8 151版本之后的JDK,如果系统自带的JDK不满足要求,需要专门为Zeppelin指定JDK。JDK满足要求的可以略过此步骤。

    cd zeppelin-0.10.0-bin-all/conf/
    cp zeppelin-env.sh.template zeppelin-env.sh
    

    然后编辑zeppelin-env.sh,加入一行:

    export JAVA_HOME=/path/to/jdk8u302-b08
    

    指定Zeppelin专属的JDK。

    Zeppelin默认的端口号是8080,如果需要修改的话,先创建一个zeppelin-site.xml文件:

    cd zeppelin-0.10.0-bin-all/conf/
    cp zeppelin-site.xml.template zeppelin-site.xml
    

    修改如下内容:

    <property>
      <name>zeppelin.server.addr</name>
      <value>0.0.0.0</value>
      <description>Server binding address</description>
    </property>
    
    <property>
      <name>zeppelin.server.port</name>
      <value>9999</value>
      <description>Server port.</description>
    </property>
    

    例子中我们没有绑定IP,端口号修改成了9999。

    最后我们通过如下命令启动Zeppelin服务:

    cd zeppelin-0.10.0-bin-all/bin
    ./zeppelin-daemon.sh start
    

    紧接着打开浏览器输入http://目标IP:9999,如果能够打开Zeppelin页面,说明配置无误。如果无法打开,说明Zeppelin配置或者环境出现了问题。可以查看Zeppelin的运行日志,Zeppelin的运行日志位于zeppelin-0.10.0-bin-all/logs目录。

    Zeppelin主界面

    配置Zeppelin的interpreter

    Zeppelin具有非常多的interpreter。interpreter为Zeppelin的插件,用于支持各种各样的编程语言和数据处理后端,例如Hive,Spark和Flink等。

    在Zeppelin中使用Flink,就必须依赖Flink interpreter,自然也离不开配置。我们重点关注几个核心的配置项。点击Zeppelin右上角的菜单,选择interpreter,在新页面的搜索框处输入flink,可以很方便的找到flink interpreter和他的配置。

    flink interpreter 配置页面

    最为重要的几个配置项为:

    • FLINK_HOME:Flink的安装目录,必须要配置。
    • HADOOP_CONF_DIR:Hadoop配置文件目录,如果Flink作业运行使用yarn模式,或者是使用HDFS,必须配置此项。
    • flink.execution.mode:Flink的运行模式,可以选择local,remote或者yarn。local为本地运行,remote需要连接远端集群(需要配置``flink.execution.remote.hostflink.execution.remote.port`,即远端Flink集群JobManager所在的host和port),yarn为提交作业到yarn集群。本例子中我们使用yarn模式。
    • flink.execution.jars:执行Flink作业依赖的其他jar包,可以配置本地文件路径或者是HDFS上的路径,多个文件使用逗号分隔。在这个例子中我们需要使用hudi,因此需要配置hudi-flink-bundle_xxxxxx.jar的全路径。

    配置完毕之后,我们点击flink interpreter栏右上方的saverestart,使配置项生效。

    验证Zeppelin Flink interpreter是否配置正确

    接下来是验证步骤,我们打开Notebook菜单,选择Flink Tutorial -> Flink Basics。找到下方的Batch WordCount,点击右侧的运行按钮。如果下方能看到WordCount运行结果,说明Flink interpreter配置无误。

    Flink WordCount运行成功页面

    创建Note并通过Flink SQL操作Hudi表

    点击Notebook菜单,选择Create new note,在弹出的对话框中填写note的名称,选择默认的interpreter为Flink,点击create按钮。

    在编写Flink SQl之前,我们需要先写提示符,用来告诉Zeppelin需要怎样解析我们的编程脚本,提示符共有以下5种:

    • %flink - 创建ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment 并且提供Scala环境
    • %flink.pyflink - 提供Python环境
    • %flink.ipyflink - 提供ipython环境
    • %flink.ssql - 提供流处理SQL环境
    • %flink.bsql - 提供批处理SQL环境

    这里我们使用%flink.ssql提示符,填写入前面Hudi的测试SQL并执行,如果能够正常创建Hudi表,插入数据并查询出。说明配置无误。Zeppelin可以正常使用。

    Flink 操作Hudi表

    使用Hive metastore

    前面例子中表的元数据是在内存中保存的,如果Flink yarn session退出,表的元数据会丢失。下次使用的时候需要再次创建表,非常不便于使用。在这一节我们打算使用Hive的metastore作为元数据容器。表元数据保存在Hive的metastore中是一种方便的多的方案。表元数据不会因为Flink session的停止而丢失。

    首先我们需要检查配合使用的hive的版本。在Flink安装目录的lib中添加对应的依赖。Hive版本和对应依赖请参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/

    本例子中我们使用Hive 3.1.0 配合Flink 1.12.2使用。需要准备如下文件:

    • flink-connector-hive_2.11-1.12.2.jar
    • hive-exec-3.1.0.jar
    • libfb303-0.9.3.jar
    • antlr-runtime-3.5.2.jar

    第一个文件我们可以从中央仓库下载,后面3个文件在hive安装目录能够找到。

    接下来我们将这4个文件放置在Flink的lib目录中:

    cd /path/to/flink-1.12.2/lib
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.12.2/flink-connector-hive_2.11-1.12.2.jar
    
    cd /path/to/hive/lib
    cp hive-exec-3.1.0.jar /path/to/flink-1.12.2/lib/
    cp libfb303-0.9.3.jar /path/to/flink-1.12.2/lib/
    cp antlr-runtime-3.5.2.jar /path/to/flink-1.12.2/lib/
    

    然后重启Zeppelin的Flink interpreter。

    接下来我们创建Hive catalog。打开前一节创建的Flink note,执行如下SQL语句:

    %flink.ssql
    CREATE CATALOG myhive WITH (
        'type' = 'hive',
        'default-database' = 'default',
        'hive-conf-dir' = '/path/to/hive/conf'
    );
    

    这条SQL语句创建出一个Hive catalog,具体使用Hive的哪个database我们可以提前使用beeline查询好。其中hive-conf-dir最为重要,必须要指定Hive配置文件所在的目录(一般是Hive安装路径下的conf目录)。创建Hive Catalog成功的截图如下:

    创建Hive Catalog

    然后我们执行下面的SQL,测试下Flink能否获取到Hive中default数据库下的表。

    %flink.ssql
    show catalogs;
    use catalog myhive;
    show tables;
    

    执行成功的输出如下图所示(table部分未截图):


    Show Catalogs

    如果到这一步能够列出Hive的catalog和管理的tables,说明前面步骤操作无误,可以进行下一步,使用将Hudi表交给Hive catalog管理。

    我们再次执行第一节中的测试SQL。观察Zeppelin的输出。


    创建Hudi表

    虽然这里执行成功了,但是本人清理环境后反复测试这条SQL的时候遇到了错误:


    错误信息

    比较诡异。本人使用Flink SQL client执行均无问题,检查Zeppelin Flink Session的classpath,发现hive-exec包已经加载,应该不存在问题才对。怀疑是Zeppelin和Flink版本兼容存在问题。待得到初步解决方案后本人将更新此博客。

    使用 Hive sync

    Hive sync模式Flink会使用Hive的metastore,同时还保持同步,通过Flink维护的Hudi表也能够通过Hive查询。

    启用Hive sync需要重新编译Hudi,因为Hudi默认编译参数是不包含Hive相关依赖的。在编译之前我们必须要确定配合使用的Hive的版本。这里以Hive3.1.0为例。修改hudi/packaging/hudi-flink-bundle/pom.xml文件,找到如下部分:

    <profile>
      <id>flink-bundle-shade-hive3</id>
      <properties>
        <hive.version>3.1.0</hive.version>
        <flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
      </properties>
      <dependencies>
        <dependency>
          <groupId>${hive.groupid}</groupId>
          <artifactId>hive-service-rpc</artifactId>
          <version>${hive.version}</version>
          <scope>${flink.bundle.hive.scope}</scope>
        </dependency>
      </dependencies>
    </profile>
    

    在这一段中,修改hive的版本为实际使用的版本。

    然后进入hudi项目根目录,执行如下命令编译

    mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Pinclude-flink-sql-connector-hive
    

    最后复制编译输出到Flink的lib目录:

    cp /opt/zy/hudi/packaging/hudi-flink-bundle/target/hudi-flink-bundle_2.11-0.9.0.jar /flink-1.12.2/lib/
    

    注意:如果Flink目录中已经有flink-connector-hive的jar包,请务必移除,否则会出现依赖冲突。

    整理好的Flink lib目录如下所示:

    flink-csv-1.12.2.jar
    flink-dist_2.11-1.12.2.jar
    flink-hadoop-compatibility_2.11-1.12.2.jar
    flink-json-1.12.2.jar
    flink-shaded-zookeeper-3.4.14.jar
    flink-table_2.11-1.12.2.jar
    flink-table-blink_2.11-1.12.2.jar
    hudi-flink-bundle_2.11-0.9.0-hive.jar
    log4j-1.2-api-2.12.1.jar
    log4j-api-2.12.1.jar
    log4j-core-2.12.1.jar
    log4j-slf4j-impl-2.12.1.jar
    

    接着我们需要处理Hive的依赖。这一步如果忘了做,后面使用Hive查询Hudi表的时候,会报如下错误:

    Error: Error while compiling statement: FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat (state=42000,code=40000)
    

    我们进入Hudi的packaging/hudi-hadoop-mr-bundle/target目录,复制hudi-hadoop-mr-bundle-0.9.0.jar到Hive安装目录的auxlib下。记得重启Hive所有服务。

    注意:如果使用HDP,请务必复制hudi-hadoop-mr-bundle-0.9.0.jar到hiveserver2所在机器的auxlib目录,否则仍然会报ClassNotFoundException。

    到这一步Hive sync已经配置完毕,接下来我们验证Hive sync的功能。

    首先进入Flink SQL client,启动之前记得执行下面脚本:

    export HADOOP_CLASSPATH=`hadoop classpath`
    

    然后执行如下SQL:

    CREATE TABLE t1(
      uuid VARCHAR(20),
      name VARCHAR(10),
      age INT,
      ts TIMESTAMP(3),
      `partition` VARCHAR(20)
    )
    PARTITIONED BY (`partition`)
    WITH (
      'connector' = 'hudi',
      'path' = 'hdfs:///zy/hudi',
      'table.type' = 'COPY_ON_WRITE',
      'hive_sync.enable' = 'true',
      'hive_sync.mode' = 'hms',
      'hive_sync.metastore.uris' = 'thrift://sizu02:9083',
      'hive_sync.table'='t1', 
      'hive_sync.db'='default'
    );
    

    增加的几个重要配置项的解释如下:

    • table.type: 测试中我们使用COPY_ON_WRITE。如果使用MERGE_ON_READ,在生成parquet文件之前,Hive查询不到数据
    • hive_sync.enable: 是否启用hive同步
    • hive_sync.mode: hive同步模式,包含hms和jdbc两种,这里使用hms模式
    • hive_sync.metastore.uris: 配置hive metastore的URI
    • hive_sync.table: 同步到hive中的表名称
    • hive_sync.db: 同步到hive的哪个数据库中

    然后插入测试数据:

    INSERT INTO t1 VALUES
      ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
      ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
      ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
      ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
      ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
      ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
      ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
      ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
    

    然后执行

    select * from t1;
    

    可以成功查询出数据。

    最后我们测试下通过Hive的beeline查询数据:

    set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
    
    select * from t1;
    

    结果如下:

    +-------------------------+--------------------------+------------------------+----------------------------+----------------------------------------------------+----------+----------+---------+--------+---------------+
    | t1._hoodie_commit_time  | t1._hoodie_commit_seqno  | t1._hoodie_record_key  | t1._hoodie_partition_path  |                t1._hoodie_file_name                | t1.uuid  | t1.name  | t1.age  | t1.ts  | t1.partition  |
    +-------------------------+--------------------------+------------------------+----------------------------+----------------------------------------------------+----------+----------+---------+--------+---------------+
    | 20211019164113          | 20211019164113_3_1       | id1                    | par1                       | 4c10d680-7b18-40c9-952e-75435101cb55_3-4-0_20211019164113.parquet | id1      | Danny    | 23      | 1000   | par1          |
    | 20211019164113          | 20211019164113_3_2       | id2                    | par1                       | 4c10d680-7b18-40c9-952e-75435101cb55_3-4-0_20211019164113.parquet | id2      | Stephen  | 33      | 2000   | par1          |
    | 20211019164113          | 20211019164113_1_3       | id3                    | par2                       | 9e94999f-0ce5-4741-b579-3aa0d5ac5f1b_1-4-0_20211019164113.parquet | id3      | Julian   | 53      | 3000   | par2          |
    | 20211019164113          | 20211019164113_1_4       | id4                    | par2                       | 9e94999f-0ce5-4741-b579-3aa0d5ac5f1b_1-4-0_20211019164113.parquet | id4      | Fabian   | 31      | 4000   | par2          |
    | 20211019164113          | 20211019164113_0_1       | id5                    | par3                       | fa2c57a6-6573-477d-af06-6b8f3a34f8da_0-4-0_20211019164113.parquet | id5      | Sophia   | 18      | 5000   | par3          |
    | 20211019164113          | 20211019164113_0_2       | id6                    | par3                       | fa2c57a6-6573-477d-af06-6b8f3a34f8da_0-4-0_20211019164113.parquet | id6      | Emma     | 20      | 6000   | par3          |
    | 20211019164113          | 20211019164113_2_1       | id7                    | par4                       | 2162e217-8d2e-4f2c-bd8b-7d76e213a1f1_2-4-0_20211019164113.parquet | id7      | Bob      | 44      | 7000   | par4          |
    | 20211019164113          | 20211019164113_2_2       | id8                    | par4                       | 2162e217-8d2e-4f2c-bd8b-7d76e213a1f1_2-4-0_20211019164113.parquet | id8      | Han      | 56      | 8000   | par4          |
    +-------------------------+--------------------------+------------------------+----------------------------+----------------------------------------------------+----------+----------+---------+--------+---------------+
    

    在Zeppelin中使用Flink Hudi Hive Sync

    上面的环境如果直接在Zeppelin中运行会出现很多报错,首先需要处理依赖问题。复制hadoop mapreduce的相关jar包到flink的lib目录中。这里以HDP的为例:

    cp hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar /path/to/flink-1.12.2/lib/
    

    然后配置Zeppelin的flink interpreter,不要勾选zeppelin.flink.module.enableHive配置项,否则会出现插入的数据无法在Hive查询到的问题。意思也就是说,不需在flink interpreter中配置任何hive相关的内容。接下来按照上一节的操作,可以在Zeppelin中完美使用。

    相关文章

      网友评论

        本文标题:Flink 使用之操作 Hudi 表

        本文链接:https://www.haomeiwen.com/subject/iikuoltx.html