美文网首页
Flink_StreamingFileSink-实时数据写入HD

Flink_StreamingFileSink-实时数据写入HD

作者: Eqo | 来源:发表于2022-09-01 09:12 被阅读0次

    Flink DataStream中将流数据保存HDFS文件系统方式:

    • 第一种方式:自定义Sink,实现RichSinkFunction

      • 使用JDBC的方式将数据写入到Hive数据库中,这种方式效率比较低
      • 原因在于:INSERT INTO 插入数据,底层运行MapReduce程序,所以不推荐使用,了解即可。
    • 第二种方式:StreamingFileSink Connector

      • 流式写入HDFS文件,吞吐量较高

    StreamingFileSink 实现

    数据落地HDFS,使用Flink DataStream中自带Connector:StreamingFileSink,将分区文件写入到支持 [Flink FileSystem]
    (https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/filesystems/index.html) 接口的文件系统中。

    概念
    1.StreamingFileSink 会将数据写入Bucket桶中(可以理解为Hive的分区目录dt=yyyy-DD-mm)
    2.每个子任务写入数据的时候,都有一个单独的文件.
    3.写入的文件有三种状态
    - finshed 可读取的文件
    - pending-flie 写入完成的文件,但不能读取
    - in-process 正在写入文件

    设置
    创建StreamingFlieSink实例对象有意向四个设置

    • 数据存储格式
      row行 bulk(列示)(avro,orc Parquet )

    • 分桶策略(十分重要)
      分区目录名称,

    • 文件滚动策略
      按照文件大小和超时时间滚动文件

    • 文件名称
      文件前缀 文件名后缀

    重要

    • 在使用StreamingFileSink 必须开启 Chckpoint
    • 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。

    面试题

    StreamingFileSink怎样实现写入Hdfs的精确性一次语义?
    通过subtask写入的文件状态实现的,写入的文件有三种状态:可读取,写入完成,正在写入.
    写入的文件有三种状态:in-process、in-pending、finshed,invoke方法里面正在写入的文件状态是in-process,当满足滚动策略之后将文件变为in-pending状态,

    滚动策略

    滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

    自定义分桶

    自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
         todo: 定义子类,实现接口,重写方法,创建对象
         BucketAssigner<IN, BucketID> 中泛型参数含义:
         1) 第1个泛型参数:IN
             The type of input elements,表示数据流中每条数据类型
         2) 第2个泛型参数:BucketID
             表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
    
        /**
         * 自定义StreamingFileSink中桶分配器策略,实现与Hive中分区路径一致:dt=2022-08-30
         *      todo: 定义子类,实现接口,重写方法,创建对象
         *      BucketAssigner<IN, BucketID> 中泛型参数含义:
         *      1) 第1个泛型参数:IN
         *          The type of input elements,表示数据流中每条数据类型
         *      2) 第2个泛型参数:BucketID
         *          表示桶名称,直接返回String字符串即可,表示数据流中每条数据写入目录
         */
        private static class HivePartitionBucketAssigner implements BucketAssigner<String, String> {
    
    
            // 表的名称
            @Override
            public String getBucketId(String element, Context context) {
                // a. 获取当前日期
                String currentDate = DateUtil.getCurrentDate();
                // b. 拼凑字符串: 桶id
                String bucketId = "dt=" + currentDate;
                // c. 直接返回桶id
                return bucketId;
            }
            // 序列化类 参考默认的
            @Override
            public SimpleVersionedSerializer<String> getSerializer() {
                return SimpleVersionedStringSerializer.INSTANCE;
            }
    
        }
    

    copy代码

     public static StreamingFileSink<String> getFileSink(String tableName ){
            //1. 文件写入路径
            String outputPath = ConfigLoader.get("hdfsUri") + "/user/hive/warehouse/vehicle_ods.db/" + tableName;
    
            //2. 创建StreamingFileSink对象
            StreamingFileSink<String> fileSink = StreamingFileSink
                    // 4-1. 设置存储文件格式,Row一行一行数据存储
                    .<String>forRowFormat(
                            new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")
                    )
                    // 4-2. 设置桶分配策略,存储目录名称,默认基于事件分配器
                    .withBucketAssigner(
    //                        new DateTimeBucketAssigner<String>("yyyyDDmm")
                           new HivePartitionBucketAssigner()
                    )
    
                    // 4-3. 设置数据文件滚动策略,如何产生新文件
                    .withRollingPolicy(
                            DefaultRollingPolicy.builder()
                                    // 时间间隔 2分钟写入一次,如果1分钟没有数据写入 就自动写入
                                    .withRolloverInterval(TimeUnit.MINUTES.toMillis(10))
                                    // 多久不写入数据时间间隔
                                    .withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
                                    // 文件大小
                                    .withMaxPartSize(128 * 1024 * 1024)
                                    .build()
                    )
                    // 4-4. 设置文件名称  车辆数据
                    .withOutputFileConfig(
                            OutputFileConfig.builder()
                                    .withPartPrefix("vehicle")
                                    .withPartSuffix(".data")
                                    .build()
                    )
                    .build();
    
            return fileSink;
        }
    
    

    相关文章

      网友评论

          本文标题:Flink_StreamingFileSink-实时数据写入HD

          本文链接:https://www.haomeiwen.com/subject/pkbdnrtx.html