Flume入门

作者: FantJ | 来源:发表于2018-07-29 15:15 被阅读7次

    flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,可以是文件、可以是hdfs。

    安装

    tar -zxvf apache-flume-1.6.0-bin.tar.gz
    

    配置环境变量

    export FLUME_HOME=/xxx/flume
    export PATH=$PATH:$FLUME_HOME/bin
    
    修改conf下的flume-env.sh,在里面配置JAVA_HOME

    验证

    flume-ng  version
    
    
    [root@s166 log]# flume-ng version
    Flume 1.6.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
    Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
    From source with checksum b29e416802ce9ece3269d34233baf43f
    

    好了,到这里我们环境就配置好了。

    实例1:监听一个指定的网络端口

    1.1 配置文件
    flume官网中NetCat Source描述:
    
    Property Name Default     Description
    channels       –     
    type           –     The component type name, needs to be netcat
    bind           –  日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听          
    port           –  日志需要发送到的端口号,该端口号要有netcat类型的source在监听   
    

    然后在flume/conf目录下创建一个配置文件netcat-logger.conf

    # 定义这个agent中各组件的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 描述和配置source组件:r1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # 描述和配置sink组件:k1
    a1.sinks.k1.type = logger
    
    # 描述和配置channel组件,此处使用是内存缓存的方式
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 描述和配置source  channel   sink之间的连接关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    表示的是监听44444端口

    1.2 启动收集

    flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

    -c conf   指定flume自身的配置文件所在目录
    -f conf/netcat-logger.con  指定我们所描述的采集方案
    -n a1  指定我们这个agent的名字
    
    1.3 测试

    在另一个终端上执行nc localhost 44444(没有nc的yum install nmap-ncat.x86_64,如果没有该包,请更新成阿里yum源:Redhat7.x 修改阿里云yum源

    [root@s166 log]# nc localhost 44444
    hello
    OK
    fantj
    OK
    

    然后看flume服务端的响应:

     18:48:48 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
     18:48:49 INFO sink.LoggerSink: Event: { headers:{} body: 66 61 6E 74 6A                                  fantj }
    

    实例2. 监听一个指定的目录,每当有新文件出现,就需要把文件采集到HDFS中去

    sources.type:  spooldir
    sinks.type: hdfs
    
    2.1 配置文件

    flume/conf目录下创建一个配置文件spooldir.conf

    #定义三大组件的名称
    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # 配置source组件(监听的文件不能重复)
    
    agent1.sources.source1.type = spooldir
    agent1.sources.source1.spoolDir = /home/fantj/log/
    agent1.sources.source1.fileHeader = false
    
    #配置拦截器
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # 配置sink组件
    agent1.sinks.sink1.type = hdfs
    agent1.sinks.sink1.hdfs.path =hdfs://s166/weblog/flume-collection/%y-%m-%d/
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1
    

    大概意思是:监听/home/fantj/log/这个文件,并把它上传到hdfs://s166/weblog/flume-collection/%y-%m-%d/这个路径下。

    2.2 启用收集

    flume-ng agent -c conf -f ../conf/spoordir.conf -n agent1 -Dflume.root.logger=INFO,console

    2.3 测试

    我在/home/fantj/log目录下创建一个文本文件。

    test.txt
    
    this is a spoordir agent test
    

    然后看flume服务端响应:

     19:00:24 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/fantj/log/test.txt to /home/fantj/log/test.txt.COMPLETED
     19:00:24 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
     19:00:24 INFO hdfs.BucketWriter: Creating hdfs://s166/weblog/flume-collection/18-07-27//access_log.1532732424184.tmp
    

    上传完成后,它会给这个文件加个后缀变成test.txt.COMPLETED来表示成功。

    我们打开hadoop的管理页:http://192.168.27.166:50070




    打开文件查看

    实例3:监听一个指定的文件,每当有新更改,就需要把文件采集到HDFS中去

    sources.type: exec
    sink.type: hdfs 
    
    3.1 配置文件

    同样,我们创建exec.conf文件

    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # Describe/configure tail -F source1
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -F /home/fantj/log/web_log.log
    agent1.sources.source1.channels = channel1
    
    #configure host for source
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # Describe sink1
    agent1.sinks.sink1.type = hdfs
    #a1.sinks.k1.channel = c1
    agent1.sinks.sink1.hdfs.path =hdfs://s166/weblog/flume/%y-%m-%d/
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1
    

    监听/home/fantj/log/web_log.log这个文件,上传到hdfs://s166/weblog/flume/%y-%m-%d/

    3.2 启动

    flume-ng agent -c conf -f ../conf/exec.conf -n agent1 -Dflume.root.logger=INFO,console

    3.3 测试

    我在这个文件里新添:

    test
    test
    test
    

    然后看flume服务端的响应:

    19:15:54 INFO hdfs.BucketWriter: Creating hdfs://s166/weblog/flume/18-07-27//access_log.1532733353751.tmp
    19:16:56 INFO hdfs.BucketWriter: Closing hdfs://s166/weblog/flume/18-07-27//access_log.1532733353751.tmp
    19:16:56 INFO hdfs.BucketWriter: Renaming hdfs://s166/weblog/flume/18-07-27/access_log.1532733353751.tmp to hdfs://s166/weblog/flume/18-07-27/access_log.1532733353751
    

    Creating(刚启动日志) ->Closing ->Renaming (修改文件后日志)
    同理我把文件下载下来打开:


    相关文章

      网友评论

      本文标题:Flume入门

      本文链接:https://www.haomeiwen.com/subject/jprzmftx.html