美文网首页
搭建一个数据上报(采集)系统

搭建一个数据上报(采集)系统

作者: 你值得拥有更好的12138 | 来源:发表于2019-11-20 20:10 被阅读0次

本文仅供思路参考,大佬多赐教

一、物理结构图

image.png
这是测试环境,线上环境可能会是下面的这种情况
image.png
下文只说测试环境的搭建

二、数据流程

1.上报方式 GET
2.通过携带参数与hive表的一一映射
3.日志清洗使用spark

image.png

具体代码

具体代码总共以下几步,可以使用oozie,调度工具进行调度
1.nginx 日志切割

#!/bin/bash
YESTERDAY=$(date -d "yesterday" +"%Y%m%d")
LOGPATH=/usr/local/cluster_app/openresty-1.13.6.2/nginx/logs/

PID=${LOGPATH}nginx.pid
mv ${LOGPATH}access.log ${LOGPATH}history/access-${YESTERDAY}.log
mv ${LOGPATH}error.log ${LOGPATH}history/error-${YESTERDAY}.log

kill -USR1 `cat ${PID}`

2.flume负载均衡配置

a1.sources = nginxLog
a1.channels = memoryChannel
a1.sinks = sink1 sink2
a1.sinkgroups = sinkGroup
a1.sinkgroups.sinkGroup.sinks = sink1 sink2

a1.sources.nginxLog.type = exec
a1.sources.nginxLog.command = tail -F /usr/local/cluster_app/openresty-1.13.6.2/nginx/logs/access.log

a1.sources.nginxLog.channels = memoryChannel


a1.sinks.sink1.type = avro
a1.sinks.sink1.channel = memoryChannel
a1.sinks.sink1.hostname = Test-Hadoop-003 
a1.sinks.sink1.port = 4545

a1.sinks.sink2.type = avro
a1.sinks.sink2.channel = memoryChannel
a1.sinks.sink2.hostname = Test-Hadoop-004 
a1.sinks.sink2.port = 4545






a1.channels.memoryChannel.type = memory



a1.sinkgroups.sinkGroup.processor.type = load_balance
a1.sinkgroups.sinkGroup.processor.backoff = true
a1.sinkgroups.sinkGroup.processor.selector = round_robin
a1.sinkgroups.sinkGroup.processor.selector.maxTimeOut = 30000


a1.channels.memoryChannel.capacity = 500000
a1.channels.memoryChannel.transactionCapacity = 10000
a1.channels.memoryChannel.byteCapacityBufferPercentage = 20
a1.channels.memoryChannel.byteCapacity = 800000

3.flume下沉配置,两台一样

a1.sources = avroSrc
a1.channels = memoryChannel
a1.sinks = k1

# For each one of the sources, the type is defined
a1.sources.avroSrc.type = avro
a1.sources.avroSrc.bind = 0.0.0.0
a1.sources.avroSrc.port = 4545




# Each sink's type must be defined
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://36.251.250.6:8020/flume/kankan/app/ds=%Y%m%d/hour=%H/
a1.sinks.k1.hdfs.filePrefix = nginx-%Y%m%d%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 5120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType =  DataStream
a1.sinks.k1.sink.serializer = Text
a1.sinks.k1.sink.serializer.appendNewline = true


# Each channel's type is defined.
a1.channels.memoryChannel.type = memory
a1.channels.memoryChannel.capacity = 500000
a1.channels.memoryChannel.transactionCapacity = 1000
a1.channels.memoryChannel.byteCapacityBufferPercentage = 20
a1.channels.memoryChannel.byteCapacity = 5000000000

a1.sinks.k1.channel = memoryChannel
a1.sources.avroSrc.channels = memoryChannel

4.spark清洗代码(根据需求修改)

package com.kankan.tongji

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.http.util.TextUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

class RDDMultipleTextOutputFormat1 extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any =
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
    key.toString + "/" + name
}

object NginxEtl {
  private val APP_NAME = ConfigUtil.getValue("appName")
  private val FLUME_LOG_DIR = ConfigUtil.getValue("flumeLog")
  private val HDFS_URL = ConfigUtil.getValue("hdfs")
  private val HDFS_URL_PORT = ConfigUtil.getValue("hdfsWithPort")

  def etl(): Unit = {
    val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[*]")
    val sc = new SparkContext(conf)
    val currentTime = RDDMultipleTextOutputFormat.getFrontHour().split(" ")
    val date = currentTime(0)
    val hour = currentTime(1)
    var log: RDD[String] = null
    val flumeLog = FLUME_LOG_DIR + "ds=%s/hour=%s/".format(date,hour)
    val hdfs = FileSystem.get(
      new URI(HDFS_URL_PORT), new Configuration())

    if(!hdfs.exists(new Path(flumeLog))){
      println("Hdfs directory is't exist,check it:%s.".format(flumeLog))
      return
    }

    log = sc.textFile(flumeLog)
    log.filter{
      l =>
      if(l.length == 0)
        false
      else
        true
    }.filter {
      l =>
        val url: String = l.split(" ")(6)
        if (url.startsWith("/?u="))
          true
        else
          false
    }.mapPartitions(partition =>
      partition.map {
        lg =>
          val url: String = lg.split(" ")(6)
          val arr: Array[String] = url.replace("/?", "&").split("&rd")(0).split("&u")
          //去除第一个空格
          val mid = arr.takeRight(arr.length - 1)
          val result = mid.map {
            x =>
              var kv: Array[String] = x.split("=")
              if (kv.length < 2)
                (kv(0), "")
              else
                (kv(0), kv(1))
          }.sortBy {
            tuple =>
              var comparable = -1
              if (TextUtils.isEmpty(tuple._1))
                comparable = 0
              else
                comparable = tuple._1.toInt
              comparable
          }
          result

      }
    ).mapPartitions(
      iter =>
        iter.map { arr =>
          val k = arr(0)._2
          val line = arr.map(_._2).mkString("\t")
          (k, line.substring(0, line.lastIndexOf("\t")))
        }
    ).saveAsHadoopFile(HDFS_URL, classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])

    val isTest = ConfigUtil.getValue("isTest")
    if(!"1".equals(isTest)){
      hdfs.delete(new Path(flumeLog),true)
      println("Delete flume log dir : %s".format(flumeLog))
    }
  }

  def main(args: Array[String]): Unit = {
    etl()
  }
}

5.hive 脚本


#!/bin/bash
echo "================================================="
dt=$(date -d "-1 hour" +'%Y%m%d %H')
ds=${dt:0:8}
hour=${dt:9:10}
HIVE="/usr/local/cluster_app/hive/bin/hive -hiveconf mapred.job.name=kkstat_hive -hiveconf hive.groupby.skewindata=false -hiveconf hive.exec.compress.output=true -hiveconf hive.exec.compress.intermediate=true  -hiveconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -hiveconf hive.map.aggr=true -hiveconf hive.stats.autogather=false -hiveconf mapred.job.queue.name=kankan"

while read table
do
  echo "use kankan_odl;LOAD DATA  INPATH '/user/kankan/warehouse/kankan_odl/${table}/ds=${ds}/hour=${hour}' INTO TABLE ${table} PARTITION (ds=${ds},hour=${hour})"
  $HIVE -e "use kankan_odl;LOAD DATA  INPATH '/user/kankan/warehouse/kankan_odl/${table}/ds=${ds}/hour=${hour}' INTO TABLE ${table} PARTITION (ds=${ds},hour=${hour})"
done < /usr/local/cluster_app/crontab-hive-load/tables.conf

相关文章

  • 搭建一个数据上报(采集)系统

    本文仅供思路参考,大佬多赐教 一、物理结构图 二、数据流程 1.上报方式 GET2.通过携带参数与hive表的一一...

  • 2019-06-03

    切磋 数据采集说明 1.搭建数据采集服务器 1.1 连接摄像头,配置数据接入 按展示配置,正常连接系统,并确认前端...

  • 从0到1搭建大数据平台之计算存储系统

    前面已经给大家讲了《从0到1搭建大数据平台之数据采集系统》、《从0到1搭建大数据平台之调度系统》,今天给大家讲一下...

  • ELFK

    搭建ELFK日志采集系统 文章目录 环境准备 操作系统信息 服务器规划 日志采集系统搭建 安装elasticsea...

  • 携程的数据采集系统架构

    实时数据采集系统 原文链接 1. 典型的数据采集分析系统 数据采集数据传输数据清洗/建模/存储数据统计/分析/挖掘...

  • 使用数据采集模块必须知道哪些

    数据采集又称数据获取是利用一种装置,从系统外部采集数据并输入到系统内部的一个接口,数据采集技术广泛应用在各个领域。...

  • 记录一次线上频繁宕机的案例

    一、业务描述 为了实时监控业务系统的健康状态,我们需要采集各业务系统的指标数据.所以定义了一系列的上报协议,其格式...

  • 增长相关工具

    邮箱数据采集工具:火车采集器; 搭建着陆页网站 :wordpress

  • 前端监控原理

    前端监控分为性能监控和错误监控。其中监控又分为两个环节:数据采集和数据上报。本文主要讲的就是如何进行数据采集和数据...

  • 新能源汽车空调数据采集系统

    空调数据采集系统用于整车空调试验过程中对各采集点的数据记录。空调数据采集系统能有效完成对车辆空调系统有关的温度、风...

网友评论

      本文标题:搭建一个数据上报(采集)系统

      本文链接:https://www.haomeiwen.com/subject/wxfaictx.html