美文网首页数客联盟程序员kafka
如何使用MorphlineInterceptor进行数据清洗

如何使用MorphlineInterceptor进行数据清洗

作者: Woople | 来源:发表于2017-06-14 14:55 被阅读626次

    业务背景

    基站同时发送3g和4g的信令到本地文件,通过flume将本地文件接入到kafka的某个topic中。
    在这个过程中,需要对3g信令数据进行清洗,满足如下要求:

    • 假设3g信令的格式为imsi,lac,type,timestamp;4g信令的格式为imsi,tac, number,area_code,timestamp。由于格式不同一,第一步就需要将3g的信令格式补全为4g的格式,没有的字段用空格补全。

    • 将3g信令的timestamp的格式统一为4g的yyyy-MM-dd HH:mm:ss格式

    • 将3g信令中的type由16进制转为10进制

    环境搭建

    使用强大的morphlines可以满足上面数据清洗的要求。
    本文使用的环境是基于HDP-2.5.3.0,具体版本如下

    需要将必要的jar包放到/usr/hdp/2.5.0.0-1245/flume/lib下面

    solr-analysis-extras-5.5.2.jar
    solr-clustering-5.5.2.jar
    solr-cell-5.5.2.jar
    solr-analytics-5.5.2.jar
    solr-dataimporthandler-5.5.2.jar
    solr-core-5.5.2.jar
    solr-dataimporthandler-extras-5.5.2.jar
    solr-langid-5.5.2.jar
    solr-morphlines-core-5.5.2.jar
    solr-morphlines-cell-5.5.2.jar
    solr-map-reduce-5.5.2.jar
    solr-uima-5.5.2.jar
    solr-test-framework-5.5.2.jar
    solr-solrj-5.5.2.jar
    solr-velocity-5.5.2.jar
    kite-morphlines-avro-1.1.0.jar
    config-1.0.2.jar
    kite-morphlines-core-1.1.0.jar
    metrics-healthchecks-3.0.1.jar
    metrics-core-3.0.1.jar
    

    这些jar包都可以在安装好hdp的环境中找到,例如/usr/lib/ambari-infra-solr/dist/usr/lib/ambari-infra-solr/contrib/morphlines-core/lib

    flume配置

    agent.sources = r1
    agent.channels = c1
    agent.sinks = s1
    
    agent.sources.r1.type = spooldir
    agent.sources.r1.spoolDir = /tmp/flume
    
    agent.sources.r1.interceptors = i1
    agent.sources.r1.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
    agent.sources.r1.interceptors.i1.morphlineFile=/home/user1/morphline.conf
    agent.sources.r1.interceptors.i1.morphlineId=morphline1
    
    agent.sinks.s1.type =org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.s1.topic = tutorials
    agent.sinks.s1.brokerList = host1:6667
    agent.sinks.s1.requiredAcks = 1
    agent.sinks.s1.batchSize = 20
    
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    
    agent.sinks.s1.channel = c1
    agent.sources.r1.channels = c1
    

    将3g信令数据放到/tmp/flume中,例如数据文件为3g.log

    11111,213,7b,1497419767
    2222,433,1c8,1497469768
    3333,981,315,1497919769
    

    morphline.conf

    下面重点说一下,如何配置morphline.conf。
    先看一下完整的配置

    morphlines: [
      {
        id: morphline1
    
        importCommands: ["org.kitesdk.**", "org.apache.solr.**"]
    
        commands: [
          {
            readLine {
              charset: UTF-8
            }
          }
    
          {
            split {
              inputField: message
              outputFields: [imsi, lac, type, timestamp]
              separator: ","
              isRegex: false
              addEmptyStrings: false
              trim: true
            }
          }
    
          {
            convertTimestamp {
              field : timestamp
              inputFormats : ["unixTimeInSeconds"]
              inputTimezone : UTC
              outputFormat : "yyyy-MM-dd HH:mm:ss"
              outputTimezone : UTC
            }
          }
    
          {
            java {
              imports: "import java.util.*;"
              code: """
                    List<String> type = record.get("type");
    
                    if(type.size()==1){
                       type.set(0,Integer.valueOf(type.get(0),16).toString());
                    }
    
                   return child.process(record);
                   """
            }
          }
    
          {
            setValues {
              _attachment_body: "@{imsi},@{lac},,,@{timestamp}"
              key: "@{imsi}"
            }
          }
    
          {
            toByteArray {field: _attachment_body}
          }
    
        ]
      }
    ]
    
    • readLine是将每行数据读入
    • split将每行数据按照逗号分隔成指定的字段
    • convertTimestamp将时间戳转为yyyy-MM-dd HH:mm:ss格式
    • java里面的代码实现16进制转10进制的功能
    • setValues设置Event的body和key
    • toByteArray必须将body转为Byte数组

    测试结果

    通过kafka的命令sh /usr/hdp/2.5.0.0-1245/kafka/bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic tutorials --property print.key=true查看结果为

    3333    3333,981,,,2017-06-20 00:49:29
    11111   11111,213,,,2017-06-14 05:56:07
    2222    2222,433,,,2017-06-14 19:49:28
    

    已经按照要求全部转化完成。morphlines的功能非常强大,基本可以解决常用的数据清洗的场景。

    相关文章

      网友评论

      本文标题:如何使用MorphlineInterceptor进行数据清洗

      本文链接:https://www.haomeiwen.com/subject/ygmsqxtx.html