美文网首页
Flink处理实时数据,有脏数据怎么办?

Flink处理实时数据,有脏数据怎么办?

作者: LZhan | 来源:发表于2019-11-13 09:17 被阅读0次

场景描述:Flink在处理实时数据时,假如其中一条数据时脏数据,例如格式错误导致Json转换异常,字段缺少等等,这个时候该怎么处理呢?

解决办法:
这种问题在Spark Sql或者Flink Sql中,最常见的办法就是直接过滤掉。

在实际中,遇到的情况会非常多,则我们可以自定义一个UDF,这个UDF的作用就是用来处理null或者空字符串或者其他各种异常情况的。
官方案例:

public class HashCode extends ScalarFunction {
  private int factor = 12;
  
  public HashCode(int factor) {
      this.factor = factor;
  }
  
  public int eval(String s) {
      return s.hashCode() * factor;
  }
}

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");

在实际工作中,在利用env.addSource方法对接Kafka数据源后,会利用map方法将对应json串转成对象,所以会try catch,即

 this.source = this.env
                    .addSource(KafkaUtil.text("tgs-topic"))
                    .uid(Constants.UID_SOURCE_TGS)
                    .name("Kafka tgs-topic Source")
                    .map(json -> {
                        try {
                            return JSON.parseObject(json, Tgs.class);
                        } catch (Exception e) {
                            logger.error("Illegal JSON message: {}", json);
                            throw e;
                        }
                    })
                    .uid(Constants.UID_MAP_JSON_PARSE)
                    .name("Parse JSON to Tgs object");

这样在遇到脏数据时,也不会因为json转换出错导致任务失败。

相关文章

网友评论

      本文标题:Flink处理实时数据,有脏数据怎么办?

      本文链接:https://www.haomeiwen.com/subject/cbvvyctx.html