美文网首页
Flink处理实时数据,有脏数据怎么办?

Flink处理实时数据,有脏数据怎么办?

作者: LZhan | 来源:发表于2019-11-13 09:17 被阅读0次

    场景描述:Flink在处理实时数据时,假如其中一条数据时脏数据,例如格式错误导致Json转换异常,字段缺少等等,这个时候该怎么处理呢?

    解决办法:
    这种问题在Spark Sql或者Flink Sql中,最常见的办法就是直接过滤掉。

    在实际中,遇到的情况会非常多,则我们可以自定义一个UDF,这个UDF的作用就是用来处理null或者空字符串或者其他各种异常情况的。
    官方案例:

    public class HashCode extends ScalarFunction {
      private int factor = 12;
      
      public HashCode(int factor) {
          this.factor = factor;
      }
      
      public int eval(String s) {
          return s.hashCode() * factor;
      }
    }
    
    BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    
    // register the function
    tableEnv.registerFunction("hashCode", new HashCode(10));
    
    // use the function in Java Table API
    myTable.select("string, string.hashCode(), hashCode(string)");
    
    // use the function in SQL API
    tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
    

    在实际工作中,在利用env.addSource方法对接Kafka数据源后,会利用map方法将对应json串转成对象,所以会try catch,即

     this.source = this.env
                        .addSource(KafkaUtil.text("tgs-topic"))
                        .uid(Constants.UID_SOURCE_TGS)
                        .name("Kafka tgs-topic Source")
                        .map(json -> {
                            try {
                                return JSON.parseObject(json, Tgs.class);
                            } catch (Exception e) {
                                logger.error("Illegal JSON message: {}", json);
                                throw e;
                            }
                        })
                        .uid(Constants.UID_MAP_JSON_PARSE)
                        .name("Parse JSON to Tgs object");
    

    这样在遇到脏数据时,也不会因为json转换出错导致任务失败。

    相关文章

      网友评论

          本文标题:Flink处理实时数据,有脏数据怎么办?

          本文链接:https://www.haomeiwen.com/subject/cbvvyctx.html