前言
CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据,然后再将健值对数据映射成map,查询时根据字段取出对应的值作为字段值,下面分不同的实现方式进行阐述。
- 程序中嵌入一小段DataStream代码解析日志,根据解析后的数据结合用户定义的schema生成TemporaryView,后续基于此进行计算。
- 用一小段scala程序清洗数据,然后解析日志数据格式,解析完之后以最简单的schema形式存入kafka作为实时数仓的dwd层,后续采用flink sql进行数据的打宽和轻度的聚合作为数仓的dws层。
- 采用csv fromat的方式解析日志,不能直接解析的可以自定义udf函数,甚至可以适当的修改其源代码。
接下来就上面三个方案进行讨论:
- 日志解析成临时表,后续基于临时表进行计算
package com.sht.flink
import java.util.Properties
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.math.NumberUtils
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.joda.time.LocalDateTime
import java.net.URLDecoder
import java.time.Duration
object AccessLog {
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.put("bootstrap.servers", "127.0.0.1:9092")
properties.put("auto.offset.reset", "latest")
properties.put("enable.auto.commit", "false")
properties.put("session.timeout.ms", "120000")
properties.put("request.timeout.ms", "180000")
properties.put("group.id", "AccessLog")
val senv = StreamExecutionEnvironment.createLocalEnvironment()
val stenv = StreamTableEnvironment.create(senv)
stenv.getConfig().setIdleStateRetention(Duration.ofHours(30))
val kafkaConsumer = new FlinkKafkaConsumer[String]("analytics_access_log", new SimpleStringSchema(), properties)
kafkaConsumer.setStartFromEarliest()
val accessLogSourceStream = senv.addSource(kafkaConsumer).setParallelism(12)
val accessLogRecordStream = accessLogSourceStream
.map((message: String) => message.replace("%", "").split("\\|"))
.filter(...)
.name("filter_access_log_reqs").uid("filter_access_log_reqs")
stenv.createTemporaryView("analytics_access_log", stenv.fromDataStream(accessLogRecordStream)
.as("user_id", "timestamp", "event_id"))
stenv.executeSql("select * from analytics_access_log").print()
}
}
注:该临时表底层的流是Append Stream,但是跟维表join之后会变成Changelog Stream,且对于kafka的offset管理不太友好。
- 采用DataStream API对数据进行清洗,然后回写kafka产生实时数仓dwd层,后续基于SQL再进行补维操作,此方法的好处是对数据清洗和打宽分开进行处理,方便后续修改,不好的地方是点击数据一般比较大,这样会产生很多冗余数据。代码省略。。。
- 基于CSC Format和自定义UDF进行日志解析,该方法也是业内常用的方式。 首先解析日志外层的数据结构,如果是nginx日志,一般用tab或者|分割,这一层直接用csv format解析即可,然后自定义UDF将日志中请求的参数和值进行解析,存入map中,查询时根据key去读。
package com.sht.flink
import com.sht.flink.udf.SplitQueryParamsAsMap
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object CsvFormatTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnvironment = StreamTableEnvironment.create(env)
/// 注册函数
tableEnvironment.createTemporarySystemFunction("SplitQueryParamsAsMap", classOf[SplitQueryParamsAsMap])
// access flink configuration// access flink configuration
val configuration = tableEnvironment.getConfig.getConfiguration
// set low-level key-value options
configuration.setString("table.dynamic-table-options.enabled", "true")
tableEnvironment.executeSql("" +
"CREATE TABLE kafka_analytics_access_log ( " +
" remote_addr STRING, " +
" host STRING, " +
" request_time STRING, " +
" time_local STRING, " +
" msec STRING, " +
" request STRING, " +
" status STRING, " +
" body_bytes_sent STRING, " +
" http_referer STRING, " +
" http_cookie STRING, " +
" http_user_agent STRING, " +
" http_x_forwarded_for STRING, " +
" upstream_addr STRING, " +
" request_length STRING, " +
" query_string STRING, " +
" procTime AS PROCTIME() " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'rtdw_ods_analytics_access_log_app', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'properties.enable.auto.commit' = 'false', " +
" 'properties.session.timeout.ms' = '90000', " +
" 'properties.request.timeout.ms' = '325000', " +
" 'format' = 'csv', " +
" 'csv.field-delimiter' = '|', " +
" 'csv.ignore-parse-errors' = 'true' " +
") " +
"")
tableEnvironment.executeSql("" +
"SELECT " +
" ts, " +
" FROM_UNIXTIME(ts / 1000) AS tss, " +
" SUBSTR(FROM_UNIXTIME(ts / 1000), 0, 10) AS tssDay, " +
" CAST(COALESCE(mp['userid'], '-1') AS BIGINT) AS userId, " +
" COALESCE(mp['eventType'], '') AS eventType, " +
" CAST(COALESCE(mp['orderid'], '-1') AS BIGINT) AS orderId, " +
" CAST(COALESCE(mp['payment'], '-1.0') AS DOUBLE) AS payment, " +
" procTime " +
" FROM ( " +
" SELECT " +
" CAST(REPLACE(msec, '.', '') AS BIGINT) AS ts, " +
" SplitQueryParamsAsMap(REPLACE(query_string, '%', '')) AS mp, " +
" procTime " +
" FROM kafka_analytics_access_log /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ " +
" WHERE CHAR_LENGTH(query_string) > 1 " +
") t " +
"").print()
}
}
用到的UDF函数
package com.sht.flink.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.ScalarFunction;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
@FunctionHint(output = @DataTypeHint("MAP<STRING, STRING>"))
public class SplitQueryParamsAsMap extends ScalarFunction {
private static final long serialVersionUID = 1L;
private static Pattern ampRegex = Pattern.compile("[&]");
private static Pattern equalRegex = Pattern.compile("[=]");
public Map<String, String> eval(String queryString) throws UnsupportedEncodingException {
Map<String, String> result = new HashMap<>();
String[] kvs = ampRegex.split(queryString);
for (String kv : kvs) {
String[] pair = equalRegex.split(kv);
if (pair.length == 2) {
result.put(pair[0], URLDecoder.decode(pair[1].replaceAll("\\\\x", "%"), "UTF-8"));
}
}
return result;
}
}
结
这一篇主要介绍了行为数据(一般为特殊格式的日志)的解析。一般来说都会用CSV Format外加自定义UDF的方式实现,因为就算用程序解析的话也得按照日志的格式嘛,可能程序解析更容易写复杂的判断。
网友评论