美文网首页
flume:一个例子的分析(一)

flume:一个例子的分析(一)

作者: 博弈史密斯 | 来源:发表于2018-07-19 17:16 被阅读0次

    数据采集方案


    采用 tail-dir 的方式,实时读取 ngix 日志。

    Agent 负责对单机的日志收集工作 , collector 负责接收Agent层发送的日志, 数据分流。

    物理架构图


    每个flume agent 通过负载均衡的方式发送给2个flume collector。

    具体 flume 配置

    Agent:
    ngixAgent.sources = tail_dir_source
    ngixAgent.channels = memory_channel
    ngixAgent.sinks = sink1 sink2
    
    
    # Configure source
    ngixAgent.sources.tail_dir_source.type = org.apache.flume.source.taildir.TaildirSource
    ngixAgent.sources.tail_dir_source.channels = memory_channel
    # 重新执行的间隔,单位为毫秒
    ngixAgent.sources.tail_dir_source.restartThrottle = 10000
    # 判断是否命令一次执行成功后需要重新执行
    ngixAgent.sources.tail_dir_source.restart = true
    ngixAgent.sources.tail_dir_source.logStdErr = true
    ngixAgent.sources.tail_dir_source.positionFile = /home/logsss/positionfile/taildir_position.json
    ngixAgent.sources.tail_dir_source.filegroups = f1
    ngixAgent.sources.tail_dir_source.filegroups.f1 = /usr/local/ver01/tengine/logs/access.log
    # 一次性你可以处理batchSize个event,batchsize <= transactionCapacity <= capacity
    ngixAgent.sources.tail_dir_source.batchSize = 100
    ngixAgent.sources.tail_dir_source.backoffSleepIncrement  = 1000
    ngixAgent.sources.tail_dir_source.maxBackoffSleep  = 5000
    ngixAgent.sources.tail_dir_source.recursiveDirectorySearch = true
    ngixAgent.sources.tail_dir_source.yarnApplicationHeader = true
    ngixAgent.sources.tail_dir_source.yarnContainerHeader = true
    
    
    # Configure channel
    ngixAgent.channels.memory_channel.type = memory
    # The maximum number of events the channel will take from a source or give to a sink per transaction
    ngixAgent.channels.memory_channel.transactionCapacity = 50000
    # The maximum number of events stored in the channel
    ngixAgent.channels.memory_channel.capacity = 100000
    
    
    # Configure sinks
    ngixAgent.sinks.sink1.channel = memory_channel
    ngixAgent.sinks.sink1.type = avro
    ngixAgent.sinks.sink1.hostname = 172.31.38.39
    ngixAgent.sinks.sink1.port = 44447
    ngixAgent.sinks.sink1.connect-timeout = 120000
    ngixAgent.sinks.sink1.request-timeout = 120000
    ngixAgent.sinks.sink2.channel = memory_channel
    ngixAgent.sinks.sink2.type = avro
    ngixAgent.sinks.sink2.hostname = 172.31.58.11 
    ngixAgent.sinks.sink2.port = 44447
    ngixAgent.sinks.sink2.connect-timeout = 120000
    ngixAgent.sinks.sink2.request-timeout = 120000
    
    
    # Configure load_balance
    ngixAgent.sinkgroups = g1
    ngixAgent.sinkgroups.g1.sinks = sink1 sink2
    ngixAgent.sinkgroups.g1.processor.type = load_balance
    ngixAgent.sinkgroups.g1.processor.priority.sink1 = 7
    ngixAgent.sinkgroups.g1.processor.priority.sink2 = 9
    ngixAgent.sinkgroups.g1.processor.backoff = true
    ngixAgent.sinkgroups.g1.processor.selector = round_robin
    ngixAgent.sinkgroups.g1.processor.selector.maxTimeOut=30000
    
    Collector:
    nginx-collector.sources = src1
    nginx-collector.channels = ch1 ch3
    nginx-collector.sinks = hdfssink wrongpathsink
    
    
    # Configure source: src1
    nginx-collector.sources.src1.channels = ch1 ch3 
    nginx-collector.sources.src1.type = avro
    nginx-collector.sources.src1.restartThrottle = 10000
    nginx-collector.sources.src1.restart = true
    nginx-collector.sources.src1.logStdErr = true
    nginx-collector.sources.src1.bind = 0.0.0.0
    nginx-collector.sources.src1.port = 44447
    nginx-collector.sources.src1.threads = 12
    
    # 定义source selector     
    nginx-collector.sources.src1.selector.type = multiplexing
    nginx-collector.sources.src1.selector.header = dataroute
    # key 带有wrong 字段的都输出为错误路径,使用 ch3 这个 channel
    nginx-collector.sources.src1.selector.mapping.wrong = ch3
    # key 带有right 字段的都输出为正确路径,使用 ch1
    nginx-collector.sources.src1.selector.mapping.right = ch1 
    # memory channel 默认输出错误路径 按年月日拆分
    nginx-collector.sources.src1.selector.default = ch3 
    
    
    # 定义flume 拦截器
    nginx-collector.sources.src1.interceptors = i1 i2 i3
    #nginx-collector.sources.src1.interceptors.i1.type=org.flume.glbg.interceptors.EventTimeInterceptor$Builder
    nginx-collector.sources.src1.interceptors.i1.type=com.glbg.flume.interceptors.LogAnalysisInterceptor$Builder
    
    nginx-collector.sources.src1.interceptors.i1.preserveExisting = false
    nginx-collector.sources.src1.interceptors.i2.type = timestamp  
    nginx-collector.sources.src1.interceptors.i3.type=static
    nginx-collector.sources.src1.interceptors.i3.key=topic
    nginx-collector.sources.src1.interceptors.i3.preserveExisting = false
    nginx-collector.sources.src1.interceptors.i3.value=glbg-analitic
    
    
    nginx-collector.channels.ch1.type = file
    nginx-collector.channels.ch1.transactionCapacity = 50000
    nginx-collector.channels.ch1.capacity = 1000000
    nginx-collector.channels.ch1.checkpointDir = /opt/cloudera/flumelog/hdfs/file/checkpoint
    nginx-collector.channels.ch1.dataDirs = /opt/cloudera/flumelog/hdfs/file/data
    
    nginx-collector.channels.ch3.type = memory
    nginx-collector.channels.ch3.transactionCapacity = 50000
    nginx-collector.channels.ch3.capacity = 1000000
    
    
    nginx-collector.sinks.hdfssink.type=hdfs
    nginx-collector.sinks.hdfssink.hdfs.path=hdfs://glbgnameservice/globalegrow/nginx-log/%{YEAR}/%{MONTH}/%{DAY}/%{ubcd}/%{logtype}
    nginx-collector.sinks.hdfssink.channel=ch1
    nginx-collector.sinks.hdfssink.hdfs.fileType = DataStream
    nginx-collector.sinks.hdfssink.hdfs.filePrefix = nginx.%{YEAR}-%{MONTH}-%{DAY}
    nginx-collector.sinks.hdfssink.hdfs.fileSuffix=.log
    nginx-collector.sinks.hdfssink.hdfs.minBlockReplicas=1
    nginx-collector.sinks.hdfssink.hdfs.rollInterval=0
    # 文件到达多大再产生一个新文件,大约是 128M
    nginx-collector.sinks.hdfssink.hdfs.rollSize=132692539
    nginx-collector.sinks.hdfssink.hdfs.idleTimeout=300 
    nginx-collector.sinks.hdfssink.hdfs.batchSize=500
    nginx-collector.sinks.hdfssink.hdfs.rollCount=0
    nginx-collector.sinks.hdfssink.hdfs.round=true
    nginx-collector.sinks.hdfssink.hdfs.roundValue=10
    nginx-collector.sinks.hdfssink.hdfs.roundUnit=minute
    nginx-collector.sinks.hdfssink.hdfs.threadsPoolSize=60
    nginx-collector.sinks.hdfssink.hdfs.rollTimerPoolSize=6
    nginx-collector.sinks.hdfssink.hdfs.useLocalTimeStamp=true
    
    
    
    nginx-collector.sinks.wrongpathsink.type=hdfs
    nginx-collector.sinks.wrongpathsink.hdfs.path=hdfs://glbgnameservice/wrongpath/%Y/%m/%d/%{logtype}
    nginx-collector.sinks.wrongpathsink.channel=ch3
    nginx-collector.sinks.wrongpathsink.hdfs.fileType = DataStream
    nginx-collector.sinks.wrongpathsink.hdfs.filePrefix = nginx.%Y-%m-%d
    nginx-collector.sinks.wrongpathsink.hdfs.fileSuffix=.log
    nginx-collector.sinks.wrongpathsink.hdfs.minBlockReplicas=1
    nginx-collector.sinks.wrongpathsink.hdfs.rollInterval=0
    nginx-collector.sinks.wrongpathsink.hdfs.rollSize=132692539
    nginx-collector.sinks.wrongpathsink.hdfs.idleTimeout=600 
    nginx-collector.sinks.wrongpathsink.hdfs.batchSize=100
    nginx-collector.sinks.wrongpathsink.hdfs.rollCount=0
    nginx-collector.sinks.wrongpathsink.hdfs.round=true
    nginx-collector.sinks.wrongpathsink.hdfs.roundValue=2
    nginx-collector.sinks.wrongpathsink.hdfs.roundUnit=minute
    nginx-collector.sinks.wrongpathsink.hdfs.threadsPoolSize=30
    nginx-collector.sinks.wrongpathsink.hdfs.rollTimerPoolSize=3
    nginx-collector.sinks.wrongpathsink.hdfs.useLocalTimeStamp=true
    
    processor

    一个channel可以有多个sink应用场景有两种。
    实现负载均衡:可以使用两个sink,向两个flume节点发送数据以减小flume节点服务器的压力。例子直接参考上面的配置。

    backoff:开启后,故障的节点会列入黑名单,过一定时间再次发送,如果还失败,则等待是指数增长;直到达到最大的时间。
    如果不开启,故障的节点每次都会被重试。

    selector.maxTimeOut:最大的黑名单时间(单位为毫秒)。

    source selector

    selector 有两种,分别是 Replicating 和 Multiplexing。

    • Replicating
      会针对每一个 Event,拷贝到所有的 Channel 中
    • Multiplexing
      会根据 Event 中 Header 中的某个属性决定分发到哪个 Channel。
    event
    1. event是flume中处理消息的基本单元,由零个或者多个header和正文body组成。
    2. Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。
    3. Body是一个字节数组,包含了实际的内容。
    flume 拦截器(Interceptor)
    Timestamp Interceptor:

    时间戳拦截器,将当前时间戳(毫秒)加入到 event header中,key名字为:timestamp,value 为当前时间戳:

    a1.sources = s1
    a1.sources.s1.interceptors = i1
    a1.sources.s1.interceptors.i1.type = timestamp
    

    使用例子:

    a1.sinks = k1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
    # 文件名前缀
    a1.sinks.k1.hdfs.filePrefix = log_
    # 文件名后缀
    a1.sinks.k1.hdfs.fileSuffix = .log
    

    在使用HDFS Sink时候,hdfs.path = hdfs://ns1/flume/%Y%m%d 表示在 hdfs 上的存储路径,%Y%m%d :占位符,表示具体的某一天,这一串占位符要求 event 的 header 中必须有 timestamp 这个 key。
    也就是说,会根据 event 的 header 中保存的时间戳,把数据放入到对应时间的文件(比如 log_20180619.log)中。

    会根据时间戳将数据写入相应的文件中。

    Host Interceptor:

    主机名拦截器。将运行Flume agent的 主机名 或者 IP 地址加入到 events header 中。key名字为:host, value为当前机器的 hostname 或者 ip。

    a1.sources.s1.interceptors.i1.type = host
    a1.sources.s1.interceptors.i1.hostHeader = hostname
    

    例子:

    a1.sinks = k1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
    a1.sinks.k1.hdfs.filePrefix =  log_%{hostname}
    

    在 hdfs 对应目录下生成文件名为 log_zyb01.log 的文件

    Static Interceptor:

    可以在event的header中添加自定义的 key 和 value。

    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = static_key
    a1.sources.r1.interceptors.i1.value = static_value
    

    例子:

    a1.sinks = k1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
    a1.sinks.k1.hdfs.filePrefix =  log_%{static_key}
    

    最终在 hdfs 上生成一个文件名为 log_static_key.log 的文件。

    Regex Filtering Interceptor:

    该拦截器使用正则表达式过滤原始 events 中的内容。

    配置示例如下:

    ## source 拦截器
    a1.sources.r1.interceptors.i1.type = regex_filter
    a1.sources.r1.interceptors.i1.regex = ^lxw1234.*
    a1.sources.r1.interceptors.i1.excludeEvents = false
     
    # sink 配置 
    a1.sinks.k1.type = logger
    

    该配置表示过滤掉不是以 lxw1234 开头的 events。

    如果 excludeEvents 设为 true,则表示过滤掉以 lxw1234 开头的 events。

    Regex Extractor Interceptor:
    该拦截器使用正则表达式抽取原始 events 中的内容,并将该内容加入events header 中。

    相关文章

      网友评论

          本文标题:flume:一个例子的分析(一)

          本文链接:https://www.haomeiwen.com/subject/vyvseftx.html