美文网首页
flume 3个案例练习

flume 3个案例练习

作者: 夜希辰 | 来源:发表于2021-11-27 10:58 被阅读0次

    备注:flume采集文件到hdfs还报错,后续在更新。
    自己还是喜欢+适合用写文章的方式来学习,很就没更新文章了。加油成为更好的自己,努力学习、努力赚钱、努力理财
    flume是一个分布式的、高可靠的、高可用的将大批量的不同数据源的日志数据收集、集合、移动到数据中心进行存储的系统。即
    日志采集和汇总工具。

    一、监控端口数据

    案例需求:使用flume监控一个端口号,并打印到控制台

    1、cd /usr/flume/conf 在conf目录下新建netcat-flume-logger.conf文件
    # example.conf: A single-node Flume configuration
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    

    整个配置文件内容分为三个大部分:

    • 从整体上描述代理agent中sources、sinks、channels所涉及到的组件;
    • 详细描述agent中每一个source、sink与channel的具体实现;Source使用了netcat,指定绑定的主机以及端口号;Sink按照logger的方式进行配置
    • 通过channel将source与sink连接起来
    2、启动Flume

    cd /usr/flume,在flume目录下执行命名

    bin/flume-ng agent --conf conf --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
    

    参数说明:
    --conf : 表示配置文件存储在conf目录
    --name : 表示给agent取一个名字
    --conf-file : flume本次启动读取的配置文件是在job文件夹下面的netcat-flume-logger.conf 文件
    -Dflume.root.logger=INFO,console : -D 表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。
    日志级别包括:log、info 、warn

    3、监听端口并输入一些测试信息
    nc localhost 44444
    

    二、实时监控单个追加的文件,并上传到HDFS

    案例需求:实时监控hive日志并上传到hdfs

    步骤操作分为两步:首先把动态的文件打印到控制台,因为打印到控制台比较熟悉;然后在监控动态的文件把数据传到HDFS

    第一步:把动态的文件打印到控制台

    1、cd /usr/flume/conf 在conf目录下新建file-flume-logger.conf文件
    #name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #source config 
    a1.sources.r1.type = exec
    #监控本地文件tail -f,tail -f默认读取文件的后10行。
    a1.sources.r1.command = tail -f /home/atguigu/data/flume_test.txt
    
    #channels config 
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    #sink cinfig
    a1.sinks.k1.type = logger
    
    #bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1 
    
    • 可自行查看tail -ftail -F的区别
    2 、启动flume

    1)cd /usr/flume,在flume目录下执行命名

    bin/flume-ng agent --conf conf --conf-file job/file-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
    
    3、往文件追加内容 ,并输入一些测试信息
    echo "bbccdd" >> flume_test.txt
    

    查看flume

    第二步:监控动态的文件把数据传到HDFS

    有问题,上传不到hdfs,还没找到原因
    flume想把数据写到hdfs上,需要持有hadoop相关的jar包。flume启动时会加载lib目录下的所有jar包到环境变量中。

    1、cd /usr/flume/conf 在conf目录下新建file-flume-hdfs.conf文件
    #1)name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #2)sources configure
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/atguigu/data/flume_test.txt
    
    #3)sinks configure
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://bigdata02:9092/flume/events/%y-%m-%d/%H
    #文件的前缀
    a1.sinks.k1.hdfs.filePrefix = log-
    #是否按照时间滚动文件夹,设置为按照时间滚动文件夹
    a1.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a1.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a1.sinks.k1.hdfs.roundUnit = hour
    #积攒多少个event才刷新到hdfs
    a1.sinks.k1.hdfs.batchSize = 10
    #文件的压缩格式
    a1.sinks.k1.hdfs.fileType = DataStream
    
    
    #多久生成一个新文件,因为是测试环境,设置为60秒生成一个新文件
    a1.sinks.k1.hdfs.rollInterval = 60
    #设置每个文件的滚动大小
    a1.sinks.k1.hdfs.hdfs.rollSize = 13421770
    #设置文件的滚动与数量无关
    a1.sinks.k1.hdfs.rollCount = 0
    
    #是否使用本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    
    #4)channels configure
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    #5)bind configure
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    hdfs sink配置参数

    参考文章:hdfs sink 使用技巧
    1)文件滚动参数设置
    如果要配置文件滚动,hdfs.rollIntervalhdfs.rollSizehdfs.rollCount这三个参数需要一起配置

    hdfs.rollInterval  :
    基于时间间隔来进行文件滚动(多久生成一个新文件),默认是30,即每隔30秒滚动一个文件。0就是不使用这个策略。
    hdfs.rollSize :
    基于文件大小进行文件滚动,默认是1024,即当文件大于1024个字节时,关闭当前文件,创建新的文件。0就是不使用这个策略。
    一般文件大小设置要比块小
    hdfs.rollCount
    基于event数量进行文件滚动。默认是10,即event个数达到10时进行文件滚动。0就是不使用这个策略。
    hdfs.idleTimeout
    闲置N秒后,关闭当前文件(去掉.tmp后缀)
    

    2)文件名策略

    hdfs.filePrefix
    文件前缀,默认是FlumeData
    hdfs.fileSuffix
    文件后缀,默认没有
    

    三、实时监控文件夹,并上传到HDFS

    使用组件:taildir sourcememroy channelhdfs sink

    1、cd /usr/flume/conf 在conf目录下新建taildir-file-hdfs.conf文件
    #每行注释在代码下方
    a3.sources = r3
    #定义sources
    a3.sinks = k3
    #定义sinks
    a3.channels = c3 
    #定义channels
    
    # Describe/configure the source
    a3.sources.r3.type = spooldir 
    #表示定义source的类型是目录类型
    a3.sources.r3.spoolDir = /home/study/hive_project/logstart 
    #定义监控目录的具体位置
    a3.sources.r3.fileSuffix = .COMPLETED 
    #文件上传完毕后的后缀
    a3.sources.r3.fileHeader = true  
    #表示是否有文件头
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp) 
    #忽略所有以tmp结尾的文件,不上传
    
    # Describe the sink
    a3.sinks.k3.type = hdfs 
    #sink的类型是hdfs
    a3.sinks.k3.hdfs.path = /origin_data/mall/logstart/%Y-%m-%d  
    #数据目的地的具体路径
    a3.sinks.k3.hdfs.filePrefix = logstart-   
    #上传的文件以logstart前缀
    a3.sinks.k3.hdfs.round = true  
    #是否按照时间滚动文件
    a3.sinks.k3.hdfs.roundValue = 1  
    #多少时间单位创建一个新的文件
    a3.sinks.k3.hdfs.roundUnit = hour  
    #时间的单位 小时
    a3.sinks.k3.hdfs.useLocalTimeStamp = true  
    #是否使用本地的时间戳
    a3.sinks.k3.hdfs.batchSize = 100   
    #积累了多少event才能刷写到hdfs一次
    a3.sinks.k3.hdfs.fileType = DataStream  
    #设置文件类型
    a3.sinks.k3.hdfs.rollInterval = 600   
    #多久生成新文件
    a3.sinks.k3.hdfs.rollSize = 134217700  
    #生成多大的新文件
    a3.sinks.k3.hdfs.rollCount = 0   
    #多少enevt生成新文件
    a3.sinks.k3.hdfs.minBlockReplicas = 1   
    #多少副本数
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory  
    #表示a3的类型是channel类型是menory(内存)类型
    a3.channels.c3.capacity = 1000 
    #表示a3的channel的总容量为1000个enevt
    a3.channels.c3.transactionCapacity = 100  
    #表示a3的channel在传输的时候收集到100个enevt再去提交事务
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3  
    #表示把r3和c3连接起来
    a3.sinks.k3.channel = c3  
    #表示将K3和c3连接起来
    
    2 、启动flume

    cd /usr/flume,在flume目录下执行命名

    bin/flume-ng agent --conf conf --conf-file job/taildir-file-hdfs.conf --name a3
    
    3、查看采集信息

    1)监控文件夹


    2)HDFS上传数据

    相关文章

      网友评论

          本文标题:flume 3个案例练习

          本文链接:https://www.haomeiwen.com/subject/gllitrtx.html