使用flink 中遇到的问题总结

作者: 卫渐行 | 来源:发表于2021-02-22 20:37 被阅读0次

    问题一:如何保证数据按照事件时间准确的落到同一个分区;

    /**
     * @Author:wenwei
     * @Date : 2020/9/8 22:15
     * 自定义分桶的规则
     * 1:按照什么格式定义文件名,默认为yyyy-MM-dd-HH
     */
    @PublicEvolving
    public class CustomBucketAssigner<IN> implements BucketAssigner<IN, String> {
    
        private static final long serialVersionUID = 1L;
    
        private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
    
        private final   String formatString;
    
        private final ZoneId zoneId;
    
        private transient DateTimeFormatter dateTimeFormatter;
    
        /**
         * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.
         */
        public CustomBucketAssigner() {
            this(DEFAULT_FORMAT_STRING);
        }
    
        /**
         * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.
         *
         * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
         *                     the bucket id.
         */
        public CustomBucketAssigner(String formatString) {
            this(formatString, ZoneId.systemDefault());
        }
    
        /**
         * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.
         *
         * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
         */
        public CustomBucketAssigner(ZoneId zoneId) {
            this(DEFAULT_FORMAT_STRING, zoneId);
        }
    
        /**
         * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.
         *
         * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
         *                     the bucket path.
         * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
         */
        public CustomBucketAssigner(String formatString, ZoneId zoneId) {
            this.formatString = Preconditions.checkNotNull(formatString);
            this.zoneId = Preconditions.checkNotNull(zoneId);
        }
    //将分桶的规则写成按照事件时间;
        @Override
        public String getBucketId(IN element, BucketAssigner.Context context) {
            if (dateTimeFormatter == null) {
                dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
            }
            //固定格式命名文件夹名称
            return "p_data_day="+dateTimeFormatter.format(Instant.ofEpochMilli(context.currentWatermark()));
        }
    
        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    
        @Override
        public String toString() {
            return "DateTimeBucketAssigner{" +
                    "formatString='" + formatString + '\'' +
                    ", zoneId=" + zoneId +
                    '}';
        }
    
    }
    

    问题二: flink 如何准确的划分窗口的?

    如何正确定义window的窗口时间,保证数据都会准确的按照事件分区,不会将前一天的数据,落入到下一个时间分区里面;可以参考windows 中的源码,其中定义start时间,值得参考

    /**
         * Method to get the window start for a timestamp.
         *
         * @param timestamp epoch millisecond to get the window start. 事件发生的时间 
         * @param offset The offset which window start would be shifted by.  定义TumblingEventTimeWindows 设置云讯的offset的值,默认都为零
         * @param windowSize The size of the generated windows.  窗口大小
         * @return window start
         对应的数据应windows
         
        例如 windows Size = 5s  ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s 
        2 - (2-0+5) % 5 = 0 ,
        7 - (7 - 0 + 5) % 5 = 5 , 
        例如 windows Size = 7s  ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s 
        2 - (2-0+7) % 7 = 0;
        7 - (7-0+7)%7= 7
        
        例如 windows Size = 5s  ,offset = 1s ; 例如当前的 timestamp = 2s ; 7s 
        2 - (2-1+5) % 5 = 1 ,
        7 - (7 - 0 + 5) % 5 = 6 , 
        例如 windows Size = 7s  ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s 
        2 - (2-1+7) % 7 = 1;
        7 - (7-1+7)%7= 8
        
         */
        public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
            return timestamp - (timestamp - offset + windowSize) % windowSize;
        }
    

    问题三 : 由于数据量不断增大,解析IP地址的时候,导致文件句柄过多;

    • 将解析ip的类改造成单例类,有待优化
    
    public class Ip2regionSingleton {
    
        private static Logger logger = LoggerFactory.getLogger(Ip2regionSingleton.class);
    
        private static Ip2regionSingleton instance = new Ip2regionSingleton();
    
        private static DbConfig config;
        private static DbSearcher searcher;
    
    
        public DbSearcher getSearcher() {
            return searcher;
        }
    
    
        // 私有化构造方法
        private Ip2regionSingleton() {
    
            String path = Ip2regionSingleton.class.getResource("/").getPath();
            String dbPath =  path + "plugins/ip2region.db";
            File file = new File(dbPath);
    
            logger.info("singleton count:{}","-------------------------------------------------------");
    
            if ( file.exists()  ) {
    
                try{
                    config = new DbConfig();
                    searcher = new DbSearcher(config, dbPath);
    
                }catch (Exception e){
                    logger.error("Ip2regionSingleton:{}",e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    
        public static Ip2regionSingleton getInstance() {
            return instance;
        }
    
    }
    

    问题四: 如何解决flink pom文件中 ,包依赖的问题;

    • maven helper ;找到相应冲突的jar类;
    • 通过exclude方式,去除掉冲突的jar类

    问题五: 如何保证flink中的,端到端数据的一致性,顺序性;

    • 保证kafka中数据的顺序性;(做到全局的数据的顺序性基本上不可能,但是可以做到单分区的数据一致性)

    • kaka中,每个机器有个broker,broker里面有多个partition,partition之间通过主从方式复制;这样保证数据的一致性;

    • flink中设置 exactly oncely的语义;

       env.enableCheckpointing(parameter.getLong("checkpoint.cycle",300*1000L),CheckpointingMode.EXACTLY_ONCE);
    

    问题六: 如何保证在无事件数据更新的时候,更新watermark的值,然后触发窗口的计算

    • 在处理某些数据的时候,数据流的时间更新时间间隔大于窗口的大小,如果使用PunctuatedWatermarks 会导致watermark一直不更新;改成AssignerWithPeriodicWatermarks周期性的更新的watermark即可

      private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<PageActivityDO> {
              private static final long serialVersionUID = 1L;
              private Long currentTime = 0L;
              //允许2分钟的延迟
              private Long allowDelayTime = 120L;
              @Override
              public Watermark checkAndGetNextWatermark(PageActivityDO topic, long l) {
                  return new Watermark(currentTime - allowDelayTime);
              }
              @Override
              public long extractTimestamp(PageActivityDO topic, long l) {
                  DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
      
                  if(StringUtils.isNullOrWhitespaceOnly(topic.getPoint_time())){
                      return currentTime;
                  }
                  LocalDateTime localDateTime = LocalDateTime.parse(topic.getPoint_time(), formatter);
                  currentTime = Math.max(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(), currentTime);
      
                  return currentTime;
              }
      
      
          }
      
      private static class CustomWatermarksPeriodc<T> implements AssignerWithPeriodicWatermarks<ActivityInfoDO> {
            private static final long serialVersionUID = 1L;
            //允许30s的延迟
            private Long allowDelayTime = 30000L;
    
            @Override
            public long extractTimestamp(ActivityInfoDO topic, long l) {
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
                if(StringUtils.isNullOrWhitespaceOnly(topic.getPush_time())){
                    return System.currentTimeMillis();
                }
                LocalDateTime localDateTime = LocalDateTime.parse(topic.getPush_time(), formatter);
                logger.info("extractTimestamp,currentWatermark:{}",localDateTime );
                return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    
    
            }
    
    
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                logger.info("getCurrentWatermark, currentWatermark:{}",System.currentTimeMillis() - allowDelayTime);
                return new Watermark(System.currentTimeMillis() - allowDelayTime);
            }
        }
    
    • 特别注意 选用 Periodic WatermarkGenerator 需要设置自动watermark更新机制, setAutoWatermarkInterval(1000)

    问题七:如何保证两阶段提交的实现,保证数据能够幂等性写入和事务性的写入

    • 保证数据源数据可重放
    • 数据sink支持事务处理(预提交,回滚,提交)
    • 或者通过sink的地方,支持唯一性去重

    问题八:sink to mysql 的时候,经常报错

    • 报错类型 : The last packet successfully received from the server was 1,203,500 milliseconds ago.
    • 有可能是jdbc版本出现,同时最好采用mysql 连接池

    正确的使用valueState

    flink 对于不是大规模的中间态的管理,可以选用 fsStateBackend ;StateBackend fsStateBackend = new FsStateBackend(parameter.get("flink.state.path"));

    • 其中包括状态的保留时间;更新类型;是否可见
     StateTtlConfig   ttlConfig = StateTtlConfig
                    .newBuilder(Time.days(ttlDays))
                    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    //                .cleanupInRocksdbCompactFilter(1000L)
                    .build();
    

    参考链接:

    1: Flink最难知识点再解析 | 时间/窗口/水印/迟到数据处理

    2:Flink 中 timeWindow 滚动窗口边界和数据延迟问题调研

    3: Kafka 概述:深入理解架构

    4: generate watermarks

    5:两阶段提交(2PC)与其在Flink exactly once中的应用

    提醒,使用的是flink 1.9的版本

    相关文章

      网友评论

        本文标题:使用flink 中遇到的问题总结

        本文链接:https://www.haomeiwen.com/subject/fkdkfltx.html