美文网首页
Apache Flume基础(三)简单案例

Apache Flume基础(三)简单案例

作者: 做个合格的大厂程序员 | 来源:发表于2020-06-25 21:48 被阅读0次

    采集目录到 HDFS

    采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新文件出现, 就需要把文件采集到 HDFS 中去

    • 采集源,即 source——监控文件目录 : spooldir
    • 下沉目标,即 sink——HDFS 文件系统 : hdfs sink
    • source 和 sink 之间的传递通道——channel,可用 file channel 也可以用 内存 channel
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    ##注意:不能往监控目中重复丢同名文件
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/logs2
    a1.sources.r1.fileHeader = true
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    Channel 参数解释:

    capacity:默认该通道中最大的可以存储的 event 数量
    trasactionCapacity:每次最大可以从 source 中拿到或者送到 sink 中的 event 数量

    采集文件到 HDFS

    采集需求:比如业务系统使用 log4j 生成的日志,日志内容不断增加,需要把追 加到日志文件中的数据实时采集到 hdfs

    根据需求,首先定义以下 3 大要素

    • 根据需求,首先定义以下 3 大要素
    • 下沉目标,即 sink——HDFS 文件系统 : hdfs sink
    • Source 和 sink 之间的传递通道——channel,可用 file channel 也可以用 内存 channel

    配置文件编写:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /root/logs/test.log
    a1.sources.r1.channels = c1
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H-%M/
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    参数解析

    rollInterval

    默认值:30

    hdfs sink 间隔多长将临时文件滚动成最终目标文件,单位:秒;

    如果设置成 0,则表示不根据时间来滚动文件;

    注:滚动(roll)指的是,hdfs sink 将临时文件重命名成最终目标文

    件,并新打开一个临时文件来写入数据;

    rollSize

    默认值:1024

    当临时文件达到该大小(单位:bytes)时,滚动成目标文件;

    如果设置成 0,则表示不根据临时文件大小来滚动文件;

    rollCount

    默认值:10

    当 events 数据达到该数量时候,将临时文件滚动成目标文件;

    如果设置成 0,则表示不根据 events 数据来滚动文件;

    round

    默认值:false

    是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。

    roundValue

    默认值:1

    时间上进行“舍弃”的值;

    roundUnit

    默认值:seconds

    时间上进行“舍弃”的单位,包含:second,minute,hour

    示例:

    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S

    a1.sinks.k1.hdfs.round = true

    a1.sinks.k1.hdfs.roundValue = 10

    a1.sinks.k1.hdfs.roundUnit = minute

    当时间为 2015-10-16 17:38:59 时候,hdfs.path 依然会被解析为:

    /flume/events/20151016/17:30/00

    因为设置的是舍弃 10 分钟内的时间,因此,该目录每 10 分钟新生成一个

    相关文章

      网友评论

          本文标题:Apache Flume基础(三)简单案例

          本文链接:https://www.haomeiwen.com/subject/yteefktx.html