美文网首页
搭建一个数据上报(采集)系统

搭建一个数据上报(采集)系统

作者: 你值得拥有更好的12138 | 来源:发表于2019-11-20 20:10 被阅读0次

    本文仅供思路参考,大佬多赐教

    一、物理结构图

    image.png
    这是测试环境,线上环境可能会是下面的这种情况
    image.png
    下文只说测试环境的搭建

    二、数据流程

    1.上报方式 GET
    2.通过携带参数与hive表的一一映射
    3.日志清洗使用spark

    image.png

    具体代码

    具体代码总共以下几步,可以使用oozie,调度工具进行调度
    1.nginx 日志切割

    #!/bin/bash
    YESTERDAY=$(date -d "yesterday" +"%Y%m%d")
    LOGPATH=/usr/local/cluster_app/openresty-1.13.6.2/nginx/logs/
    
    PID=${LOGPATH}nginx.pid
    mv ${LOGPATH}access.log ${LOGPATH}history/access-${YESTERDAY}.log
    mv ${LOGPATH}error.log ${LOGPATH}history/error-${YESTERDAY}.log
    
    kill -USR1 `cat ${PID}`
    

    2.flume负载均衡配置

    a1.sources = nginxLog
    a1.channels = memoryChannel
    a1.sinks = sink1 sink2
    a1.sinkgroups = sinkGroup
    a1.sinkgroups.sinkGroup.sinks = sink1 sink2
    
    a1.sources.nginxLog.type = exec
    a1.sources.nginxLog.command = tail -F /usr/local/cluster_app/openresty-1.13.6.2/nginx/logs/access.log
    
    a1.sources.nginxLog.channels = memoryChannel
    
    
    a1.sinks.sink1.type = avro
    a1.sinks.sink1.channel = memoryChannel
    a1.sinks.sink1.hostname = Test-Hadoop-003 
    a1.sinks.sink1.port = 4545
    
    a1.sinks.sink2.type = avro
    a1.sinks.sink2.channel = memoryChannel
    a1.sinks.sink2.hostname = Test-Hadoop-004 
    a1.sinks.sink2.port = 4545
    
    
    
    
    
    
    a1.channels.memoryChannel.type = memory
    
    
    
    a1.sinkgroups.sinkGroup.processor.type = load_balance
    a1.sinkgroups.sinkGroup.processor.backoff = true
    a1.sinkgroups.sinkGroup.processor.selector = round_robin
    a1.sinkgroups.sinkGroup.processor.selector.maxTimeOut = 30000
    
    
    a1.channels.memoryChannel.capacity = 500000
    a1.channels.memoryChannel.transactionCapacity = 10000
    a1.channels.memoryChannel.byteCapacityBufferPercentage = 20
    a1.channels.memoryChannel.byteCapacity = 800000
    
    

    3.flume下沉配置,两台一样

    a1.sources = avroSrc
    a1.channels = memoryChannel
    a1.sinks = k1
    
    # For each one of the sources, the type is defined
    a1.sources.avroSrc.type = avro
    a1.sources.avroSrc.bind = 0.0.0.0
    a1.sources.avroSrc.port = 4545
    
    
    
    
    # Each sink's type must be defined
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://36.251.250.6:8020/flume/kankan/app/ds=%Y%m%d/hour=%H/
    a1.sinks.k1.hdfs.filePrefix = nginx-%Y%m%d%H
    a1.sinks.k1.hdfs.fileSuffix = .log
    a1.sinks.k1.hdfs.rollInterval = 60
    a1.sinks.k1.hdfs.rollSize = 5120
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType =  DataStream
    a1.sinks.k1.sink.serializer = Text
    a1.sinks.k1.sink.serializer.appendNewline = true
    
    
    # Each channel's type is defined.
    a1.channels.memoryChannel.type = memory
    a1.channels.memoryChannel.capacity = 500000
    a1.channels.memoryChannel.transactionCapacity = 1000
    a1.channels.memoryChannel.byteCapacityBufferPercentage = 20
    a1.channels.memoryChannel.byteCapacity = 5000000000
    
    a1.sinks.k1.channel = memoryChannel
    a1.sources.avroSrc.channels = memoryChannel
    

    4.spark清洗代码(根据需求修改)

    package com.kankan.tongji
    
    import java.net.URI
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.hadoop.io.NullWritable
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    import org.apache.http.util.TextUtils
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    class RDDMultipleTextOutputFormat1 extends MultipleTextOutputFormat[Any, Any] {
      override def generateActualKey(key: Any, value: Any): Any =
        NullWritable.get()
    
      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
        key.toString + "/" + name
    }
    
    object NginxEtl {
      private val APP_NAME = ConfigUtil.getValue("appName")
      private val FLUME_LOG_DIR = ConfigUtil.getValue("flumeLog")
      private val HDFS_URL = ConfigUtil.getValue("hdfs")
      private val HDFS_URL_PORT = ConfigUtil.getValue("hdfsWithPort")
    
      def etl(): Unit = {
        val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[*]")
        val sc = new SparkContext(conf)
        val currentTime = RDDMultipleTextOutputFormat.getFrontHour().split(" ")
        val date = currentTime(0)
        val hour = currentTime(1)
        var log: RDD[String] = null
        val flumeLog = FLUME_LOG_DIR + "ds=%s/hour=%s/".format(date,hour)
        val hdfs = FileSystem.get(
          new URI(HDFS_URL_PORT), new Configuration())
    
        if(!hdfs.exists(new Path(flumeLog))){
          println("Hdfs directory is't exist,check it:%s.".format(flumeLog))
          return
        }
    
        log = sc.textFile(flumeLog)
        log.filter{
          l =>
          if(l.length == 0)
            false
          else
            true
        }.filter {
          l =>
            val url: String = l.split(" ")(6)
            if (url.startsWith("/?u="))
              true
            else
              false
        }.mapPartitions(partition =>
          partition.map {
            lg =>
              val url: String = lg.split(" ")(6)
              val arr: Array[String] = url.replace("/?", "&").split("&rd")(0).split("&u")
              //去除第一个空格
              val mid = arr.takeRight(arr.length - 1)
              val result = mid.map {
                x =>
                  var kv: Array[String] = x.split("=")
                  if (kv.length < 2)
                    (kv(0), "")
                  else
                    (kv(0), kv(1))
              }.sortBy {
                tuple =>
                  var comparable = -1
                  if (TextUtils.isEmpty(tuple._1))
                    comparable = 0
                  else
                    comparable = tuple._1.toInt
                  comparable
              }
              result
    
          }
        ).mapPartitions(
          iter =>
            iter.map { arr =>
              val k = arr(0)._2
              val line = arr.map(_._2).mkString("\t")
              (k, line.substring(0, line.lastIndexOf("\t")))
            }
        ).saveAsHadoopFile(HDFS_URL, classOf[String], classOf[String],
          classOf[RDDMultipleTextOutputFormat])
    
        val isTest = ConfigUtil.getValue("isTest")
        if(!"1".equals(isTest)){
          hdfs.delete(new Path(flumeLog),true)
          println("Delete flume log dir : %s".format(flumeLog))
        }
      }
    
      def main(args: Array[String]): Unit = {
        etl()
      }
    }
    
    

    5.hive 脚本

    
    #!/bin/bash
    echo "================================================="
    dt=$(date -d "-1 hour" +'%Y%m%d %H')
    ds=${dt:0:8}
    hour=${dt:9:10}
    HIVE="/usr/local/cluster_app/hive/bin/hive -hiveconf mapred.job.name=kkstat_hive -hiveconf hive.groupby.skewindata=false -hiveconf hive.exec.compress.output=true -hiveconf hive.exec.compress.intermediate=true  -hiveconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -hiveconf hive.map.aggr=true -hiveconf hive.stats.autogather=false -hiveconf mapred.job.queue.name=kankan"
    
    while read table
    do
      echo "use kankan_odl;LOAD DATA  INPATH '/user/kankan/warehouse/kankan_odl/${table}/ds=${ds}/hour=${hour}' INTO TABLE ${table} PARTITION (ds=${ds},hour=${hour})"
      $HIVE -e "use kankan_odl;LOAD DATA  INPATH '/user/kankan/warehouse/kankan_odl/${table}/ds=${ds}/hour=${hour}' INTO TABLE ${table} PARTITION (ds=${ds},hour=${hour})"
    done < /usr/local/cluster_app/crontab-hive-load/tables.conf
    
    

    相关文章

      网友评论

          本文标题:搭建一个数据上报(采集)系统

          本文链接:https://www.haomeiwen.com/subject/wxfaictx.html