业务背景
基站同时发送3g和4g的信令到本地文件,通过flume将本地文件接入到kafka的某个topic中。
在这个过程中,需要对3g信令数据进行清洗,满足如下要求:
-
假设3g信令的格式为imsi,lac,type,timestamp;4g信令的格式为imsi,tac, number,area_code,timestamp。由于格式不同一,第一步就需要将3g的信令格式补全为4g的格式,没有的字段用空格补全。
-
将3g信令的timestamp的格式统一为4g的yyyy-MM-dd HH:mm:ss格式
-
将3g信令中的type由16进制转为10进制
环境搭建
使用强大的morphlines可以满足上面数据清洗的要求。
本文使用的环境是基于HDP-2.5.3.0,具体版本如下
需要将必要的jar包放到/usr/hdp/2.5.0.0-1245/flume/lib
下面
solr-analysis-extras-5.5.2.jar
solr-clustering-5.5.2.jar
solr-cell-5.5.2.jar
solr-analytics-5.5.2.jar
solr-dataimporthandler-5.5.2.jar
solr-core-5.5.2.jar
solr-dataimporthandler-extras-5.5.2.jar
solr-langid-5.5.2.jar
solr-morphlines-core-5.5.2.jar
solr-morphlines-cell-5.5.2.jar
solr-map-reduce-5.5.2.jar
solr-uima-5.5.2.jar
solr-test-framework-5.5.2.jar
solr-solrj-5.5.2.jar
solr-velocity-5.5.2.jar
kite-morphlines-avro-1.1.0.jar
config-1.0.2.jar
kite-morphlines-core-1.1.0.jar
metrics-healthchecks-3.0.1.jar
metrics-core-3.0.1.jar
这些jar包都可以在安装好hdp的环境中找到,例如/usr/lib/ambari-infra-solr/dist
和/usr/lib/ambari-infra-solr/contrib/morphlines-core/lib
。
flume配置
agent.sources = r1
agent.channels = c1
agent.sinks = s1
agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /tmp/flume
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
agent.sources.r1.interceptors.i1.morphlineFile=/home/user1/morphline.conf
agent.sources.r1.interceptors.i1.morphlineId=morphline1
agent.sinks.s1.type =org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = tutorials
agent.sinks.s1.brokerList = host1:6667
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 20
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.sinks.s1.channel = c1
agent.sources.r1.channels = c1
将3g信令数据放到/tmp/flume
中,例如数据文件为3g.log
11111,213,7b,1497419767
2222,433,1c8,1497469768
3333,981,315,1497919769
morphline.conf
下面重点说一下,如何配置morphline.conf。
先看一下完整的配置
morphlines: [
{
id: morphline1
importCommands: ["org.kitesdk.**", "org.apache.solr.**"]
commands: [
{
readLine {
charset: UTF-8
}
}
{
split {
inputField: message
outputFields: [imsi, lac, type, timestamp]
separator: ","
isRegex: false
addEmptyStrings: false
trim: true
}
}
{
convertTimestamp {
field : timestamp
inputFormats : ["unixTimeInSeconds"]
inputTimezone : UTC
outputFormat : "yyyy-MM-dd HH:mm:ss"
outputTimezone : UTC
}
}
{
java {
imports: "import java.util.*;"
code: """
List<String> type = record.get("type");
if(type.size()==1){
type.set(0,Integer.valueOf(type.get(0),16).toString());
}
return child.process(record);
"""
}
}
{
setValues {
_attachment_body: "@{imsi},@{lac},,,@{timestamp}"
key: "@{imsi}"
}
}
{
toByteArray {field: _attachment_body}
}
]
}
]
- readLine是将每行数据读入
- split将每行数据按照逗号分隔成指定的字段
- convertTimestamp将时间戳转为yyyy-MM-dd HH:mm:ss格式
- java里面的代码实现16进制转10进制的功能
- setValues设置Event的body和key
- toByteArray必须将body转为Byte数组
测试结果
通过kafka的命令sh /usr/hdp/2.5.0.0-1245/kafka/bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic tutorials --property print.key=true
查看结果为
3333 3333,981,,,2017-06-20 00:49:29
11111 11111,213,,,2017-06-14 05:56:07
2222 2222,433,,,2017-06-14 19:49:28
已经按照要求全部转化完成。morphlines的功能非常强大,基本可以解决常用的数据清洗的场景。
网友评论