美文网首页
Flink SQL实战演练之CSV Format

Flink SQL实战演练之CSV Format

作者: Coder小咚 | 来源:发表于2021-07-06 23:54 被阅读0次

    简介:实时业务中行为数据一般是以日志的形式收集的,日志的结构通常类似于nginx access log,我们在接收到日志后,需要对数据结构进行解析,按照一定的schema存储到后续的存储系统中,接下来聊聊近期对点击数据的处理方式。

    前言

    CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据,然后再将健值对数据映射成map,查询时根据字段取出对应的值作为字段值,下面分不同的实现方式进行阐述。

    • 程序中嵌入一小段DataStream代码解析日志,根据解析后的数据结合用户定义的schema生成TemporaryView,后续基于此进行计算。
    • 用一小段scala程序清洗数据,然后解析日志数据格式,解析完之后以最简单的schema形式存入kafka作为实时数仓的dwd层,后续采用flink sql进行数据的打宽和轻度的聚合作为数仓的dws层。
    • 采用csv fromat的方式解析日志,不能直接解析的可以自定义udf函数,甚至可以适当的修改其源代码。

    接下来就上面三个方案进行讨论:

    1. 日志解析成临时表,后续基于临时表进行计算
    package com.sht.flink
    
    import java.util.Properties
    import org.apache.commons.lang3.StringUtils
    import org.apache.commons.lang3.math.NumberUtils
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.joda.time.LocalDateTime
    import java.net.URLDecoder
    import java.time.Duration
    
    object AccessLog {
    
      def main(args: Array[String]): Unit = {
        val properties = new Properties()
        properties.put("bootstrap.servers", "127.0.0.1:9092")
        properties.put("auto.offset.reset", "latest")
        properties.put("enable.auto.commit", "false")
        properties.put("session.timeout.ms", "120000")
        properties.put("request.timeout.ms", "180000")
        properties.put("group.id", "AccessLog")
        val senv = StreamExecutionEnvironment.createLocalEnvironment()
        val stenv = StreamTableEnvironment.create(senv)
    
        stenv.getConfig().setIdleStateRetention(Duration.ofHours(30))
    
        val kafkaConsumer = new FlinkKafkaConsumer[String]("analytics_access_log", new SimpleStringSchema(), properties)
        kafkaConsumer.setStartFromEarliest()
    
        val accessLogSourceStream = senv.addSource(kafkaConsumer).setParallelism(12)
    
        val accessLogRecordStream = accessLogSourceStream
          .map((message: String) => message.replace("%", "").split("\\|"))
          .filter(...)
          .name("filter_access_log_reqs").uid("filter_access_log_reqs")
    
        stenv.createTemporaryView("analytics_access_log", stenv.fromDataStream(accessLogRecordStream)
          .as("user_id", "timestamp", "event_id"))
    
        stenv.executeSql("select * from analytics_access_log").print()
      }
    }
    

    注:该临时表底层的流是Append Stream,但是跟维表join之后会变成Changelog Stream,且对于kafka的offset管理不太友好。

    1. 采用DataStream API对数据进行清洗,然后回写kafka产生实时数仓dwd层,后续基于SQL再进行补维操作,此方法的好处是对数据清洗和打宽分开进行处理,方便后续修改,不好的地方是点击数据一般比较大,这样会产生很多冗余数据。代码省略。。。
    2. 基于CSC Format和自定义UDF进行日志解析,该方法也是业内常用的方式。 首先解析日志外层的数据结构,如果是nginx日志,一般用tab或者|分割,这一层直接用csv format解析即可,然后自定义UDF将日志中请求的参数和值进行解析,存入map中,查询时根据key去读。
    package com.sht.flink
    
    import com.sht.flink.udf.SplitQueryParamsAsMap
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    
    object CsvFormatTest {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tableEnvironment = StreamTableEnvironment.create(env)
    
        /// 注册函数
        tableEnvironment.createTemporarySystemFunction("SplitQueryParamsAsMap", classOf[SplitQueryParamsAsMap])
    
        // access flink configuration// access flink configuration
        val configuration = tableEnvironment.getConfig.getConfiguration
        // set low-level key-value options
        configuration.setString("table.dynamic-table-options.enabled", "true")
    
        tableEnvironment.executeSql("" +
          "CREATE TABLE kafka_analytics_access_log ( " +
          "  remote_addr STRING, " +
          "  host STRING, " +
          "  request_time STRING, " +
          "  time_local STRING, " +
          "  msec STRING, " +
          "  request STRING, " +
          "  status STRING, " +
          "  body_bytes_sent STRING, " +
          "  http_referer STRING, " +
          "  http_cookie STRING, " +
          "  http_user_agent STRING, " +
          "  http_x_forwarded_for STRING, " +
          "  upstream_addr STRING, " +
          "  request_length STRING, " +
          "  query_string STRING, " +
          "  procTime AS PROCTIME() " +
          ") WITH ( " +
          "  'connector' = 'kafka', " +
          "  'topic' = 'rtdw_ods_analytics_access_log_app', " +
          "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
          "  'properties.enable.auto.commit' = 'false', " +
          "  'properties.session.timeout.ms' = '90000', " +
          "  'properties.request.timeout.ms' = '325000', " +
          "  'format' = 'csv', " +
          "  'csv.field-delimiter' = '|', " +
          "  'csv.ignore-parse-errors' = 'true' " +
          ") " +
          "")
    
        tableEnvironment.executeSql("" +
          "SELECT " +
          "    ts, " +
          "    FROM_UNIXTIME(ts / 1000) AS tss, " +
          "    SUBSTR(FROM_UNIXTIME(ts / 1000), 0, 10) AS tssDay, " +
          "    CAST(COALESCE(mp['userid'], '-1') AS BIGINT) AS userId, " +
          "    COALESCE(mp['eventType'], '') AS eventType, " +
          "    CAST(COALESCE(mp['orderid'], '-1') AS BIGINT) AS orderId, " +
          "    CAST(COALESCE(mp['payment'], '-1.0') AS DOUBLE) AS payment, " +
          "    procTime " +
          "    FROM ( " +
          "    SELECT  " +
          "        CAST(REPLACE(msec, '.', '') AS BIGINT) AS ts,  " +
          "        SplitQueryParamsAsMap(REPLACE(query_string, '%', '')) AS mp, " +
          "        procTime " +
          "    FROM kafka_analytics_access_log /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ " +
          "    WHERE CHAR_LENGTH(query_string) > 1 " +
          ") t " +
          "").print()
      }
    }
    

    用到的UDF函数

    package com.sht.flink.udf;
    
    import org.apache.flink.table.annotation.DataTypeHint;
    import org.apache.flink.table.annotation.FunctionHint;
    import org.apache.flink.table.functions.ScalarFunction;
    import java.io.UnsupportedEncodingException;
    import java.net.URLDecoder;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;
    
    @FunctionHint(output = @DataTypeHint("MAP<STRING, STRING>"))
    public class SplitQueryParamsAsMap extends ScalarFunction {
      private static final long serialVersionUID = 1L;
    
      private static Pattern ampRegex = Pattern.compile("[&]");
      private static Pattern equalRegex = Pattern.compile("[=]");
    
      public Map<String, String> eval(String queryString) throws UnsupportedEncodingException {
        Map<String, String> result = new HashMap<>();
    
        String[] kvs = ampRegex.split(queryString);
        for (String kv : kvs) {
          String[] pair = equalRegex.split(kv);
          if (pair.length == 2) {
            result.put(pair[0], URLDecoder.decode(pair[1].replaceAll("\\\\x", "%"), "UTF-8"));
          }
        }
    
        return result;
      }
    }
    

    这一篇主要介绍了行为数据(一般为特殊格式的日志)的解析。一般来说都会用CSV Format外加自定义UDF的方式实现,因为就算用程序解析的话也得按照日志的格式嘛,可能程序解析更容易写复杂的判断。

    相关文章

      网友评论

          本文标题:Flink SQL实战演练之CSV Format

          本文链接:https://www.haomeiwen.com/subject/rupnultx.html