美文网首页
Flink_StreamingFileSink-实时数据写入HD

Flink_StreamingFileSink-实时数据写入HD

作者: Eqo | 来源:发表于2022-08-31 09:12 被阅读0次

Flink DataStream中将流数据保存HDFS文件系统方式:

  • 第一种方式:自定义Sink,实现RichSinkFunction

    • 使用JDBC的方式将数据写入到Hive数据库中,这种方式效率比较低
    • 原因在于:INSERT INTO 插入数据,底层运行MapReduce程序,所以不推荐使用,了解即可。
  • 第二种方式:StreamingFileSink Connector

    • 流式写入HDFS文件,吞吐量较高

StreamingFileSink 实现

数据落地HDFS,使用Flink DataStream中自带Connector:StreamingFileSink,将分区文件写入到支持 [Flink FileSystem]
(https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/filesystems/index.html) 接口的文件系统中。

概念
1.StreamingFileSink 会将数据写入Bucket桶中(可以理解为Hive的分区目录dt=yyyy-DD-mm)
2.每个子任务写入数据的时候,都有一个单独的文件.
3.写入的文件有三种状态
- finshed 可读取的文件
- pending-flie 写入完成的文件,但不能读取
- in-process 正在写入文件

设置
创建StreamingFlieSink实例对象有意向四个设置

  • 数据存储格式
    row行 bulk(列示)(avro,orc Parquet )

  • 分桶策略(十分重要)
    分区目录名称,

  • 文件滚动策略
    按照文件大小和超时时间滚动文件

  • 文件名称
    文件前缀 文件名后缀

重要

  • 在使用StreamingFileSink 必须开启 Chckpoint
  • 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。

面试题

StreamingFileSink怎样实现写入Hdfs的精确性一次语义?
通过subtask写入的文件状态实现的,写入的文件有三种状态:可读取,写入完成,正在写入.
写入的文件有三种状态:in-process、in-pending、finshed,invoke方法里面正在写入的文件状态是in-process,当满足滚动策略之后将文件变为in-pending状态,

滚动策略

滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

自定义分桶

自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
     todo: 定义子类,实现接口,重写方法,创建对象
     BucketAssigner<IN, BucketID> 中泛型参数含义:
     1) 第1个泛型参数:IN
         The type of input elements,表示数据流中每条数据类型
     2) 第2个泛型参数:BucketID
         表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
    /**
     * 自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
     *      todo: 定义子类,实现接口,重写方法,创建对象
     *      BucketAssigner<IN, BucketID> 中泛型参数含义:
     *      1) 第1个泛型参数:IN
     *          The type of input elements,表示数据流中每条数据类型
     *      2) 第2个泛型参数:BucketID
     *          表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
     */
    private static class HivePartitionBucketAssigner implements BucketAssigner<String, String> {


        // 表的名称
        @Override
        public String getBucketId(String element, Context context) {
            // a. 获取当前日期
            String currentDate = DateUtil.getCurrentDate();
            // b. 拼凑字符串: 桶id
            String bucketId = "dt=" + currentDate;
            // c. 直接返回桶id
            return bucketId;
        }
        // 序列化类 参考默认的
        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }

    }

copy代码

 public static StreamingFileSink<String> getFileSink(String tableName ){
        //1. 文件写入路径
        String outputPath = ConfigLoader.get("hdfsUri") + "/user/hive/warehouse/vehicle_ods.db/" + tableName;

        //2. 创建StreamingFileSink对象
        StreamingFileSink<String> fileSink = StreamingFileSink
                // 4-1. 设置存储文件格式,Row一行一行数据存储
                .<String>forRowFormat(
                        new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")
                )
                // 4-2. 设置桶分配策略,存储目录名称,默认基于事件分配器
                .withBucketAssigner(
//                        new DateTimeBucketAssigner<String>("yyyyDDmm")
                       new HivePartitionBucketAssigner()
                )

                // 4-3. 设置数据文件滚动策略,如何产生新文件
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                // 时间间隔 2分钟写入一次,如果1分钟没有数据写入 就自动写入
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(10))
                                // 多久不写入数据时间间隔
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
                                // 文件大小
                                .withMaxPartSize(128 * 1024 * 1024)
                                .build()
                )
                // 4-4. 设置文件名称  车辆数据
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("vehicle")
                                .withPartSuffix(".data")
                                .build()
                )
                .build();

        return fileSink;
    }

相关文章

网友评论

      本文标题:Flink_StreamingFileSink-实时数据写入HD

      本文链接:https://www.haomeiwen.com/subject/pkbdnrtx.html