美文网首页
flume数据采集

flume数据采集

作者: pamperxg | 来源:发表于2017-08-11 21:50 被阅读0次

    简介

    flume官网里面有user guide。
    作用:日志采集、聚合、传输
    核心组件:Agent
    agent内部组件:source,sink,channel(缓存)

    这些组件可以理解为是抽象类,有很多实现类。在使用时我们可以配置不同的实现类

    运行机制

    flume运行机制

    agent可以组织为拓扑网络:

    agent拓扑网络

    部署运行

    上传解压就o了
    然后我们就配置采集方案

    • 例1:
      从网络端口接收数据,下沉到logger
      新建采集配置文件,放在conf文件夹下:
    netcat-logger
    # Name the components on this agent
    #给那三个组件取个名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    #类型, 从网络端口接收数据,在本机启动, 所以localhost, type=netcat
    a1.sources.r1.type = netcat     
    a1.sources.r1.bind = localhost(本机)
    a1.sources.r1.port = 44444
    相当于一个服务器
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    #下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:
    #capacity:默认该通道中最大的可以存储的event数量
    #trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000 
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    log4j可以控制日志信息输送的目的地是控制台、文件、GUI组件,甚至是套接口服务器、NT的事件记录器、UNIX Syslog守护进程等
    事件event:source取一次数据
    启动:
    告诉flum启动一个agent,指定配置参数
    $ bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
    最后的参数-Dflume.root.logger=INFO,console给log4j传的jvm参数,下沉到控制台
    数据传入:

    [hadoop@mini1 ~]$ telnet localhost 44444
    Trying ::1...
    telnet: connect to address ::1: Connection refused
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    nishishui
    OK
    
    2017-08-12 00:09:21,505 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 6E 69 73 68 69 73 68 75 69 0D                   nishishui. }
    
    move file /home/hadoop/flumespool/t.dat to /home/hadoop/flumespool/t.dat.COMPLETED
    
    • 例2
      监视文件夹:
    spooldir-logger
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    #监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/hadoop/flumespool
    a1.sources.r1.fileHeader = true
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    bin/flume-ng agent -c ./conf -f ./conf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
    
    move file /home/hadoop/flumespool/t.dat to /home/hadoop/flumespool/t.dat.COMPLETED
    

    最后往/home/hadoop/flumeSpool放文件,看到监听
    不能放重复文件名的文件
    source:
    avro source:一种序列化框架,通过网络发送序列化数据,跨平台。flume可以接受。
    thrift
    exec:unix命令结果
    kafka:分布式消息缓存系统

    netcat绑定地址是localhost时telnet只能本机连,改为主机名mini1时可以从其他机器连。start-yarn不能在其他的机器起,因为start-yarn配置绑定在mini1:8031提供服务。socket编程,socket服务器只能绑定本地的地址。

    • 例3
    tail-hdfs.conf
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/hadoop/log/test.log(-F根据文件名跟踪,-f根据inode跟踪。)
    a1.sources.r1.channels = c1
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10(过10分钟改目录)
    a1.sinks.k1.hdfs.roundUnit = minute
    #文件滚动周期(s)
    a1.sinks.k1.hdfs.rollInterval = 3
    #文件滚动大小限制(bytes)
    a1.sinks.k1.hdfs.rollSize = 500
    #写入多少个event数据后滚动文件
    a1.sinks.k1.hdfs.rollCount = 20
    a1.sinks.k1.hdfs.batchSize = 5
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
    
    mkdir /home/hadoop/log
    touch /home/hadoop/log/test.log
    
    while true
    do
    echo 111111111111111 >> /home/hadoop/log/test.log
    sleep 0.5
    done
    
    tail -F /home/hadoop/log/test.log
    
    start-dfs.sh
    
    bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
    
    hdfs dfsadmin -report(以防刚启动的时候datanode汇报还没全,处于safemode)
    
    hadoop fs -ls /(mini1:50070)
    

    inode:理解inode,要从文件储存说起。
    文件储存在硬盘上,硬盘的最小存储单位叫做"扇区"(Sector)。每个扇区储存512字节(相当于0.5KB)。操作系统读取硬盘的时候,不会一个个扇区地读取,这样效率太低,而是一次性连续读取多个扇区,即一次性读取一个"块"(block)。这种由多个扇区组成的"块",是文件存取的最小单位。"块"的大小,最常见的是4KB,即连续八个 sector组成一个 block。文件数据都储存在"块"中,那么很显然,我们还必须找到一个地方储存文件的元信息,比如文件的创建者、文件的创建日期、文件的大小等等。这种储存文件元信息的区域就叫做inode,中文译名为"索引节点"。

    linux软连接和硬链接:
    ln -s /xxx /xxx(快捷方式,本质是一个文件)
    例:
    ln -s /home/hadoop/aaa/ /home/bbb
    rm /home/bbb(只删除快捷方式这个文件)
    rm -rf /home/bbb(会删除指向数据)
    ln /xxx /xxx(同一文件,有两个inode,文件名相当于一个引用)

    例4
    串联

    tail-avro-avro
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/hadoop/log/test.log
    a1.sources.r1.channels = c1
    
    # Describe the sink
    a1.sinks = k1
    #sink端的avro是一个数据发送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = mini2
    a1.sinks.k1.port = 4141
    a1.sinks.k1.batch-size = 2
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
    
    avro-hdfs/logger
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    #source中的avro是一个数据接收者
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 4141
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
    发送数据:
    $ bin/flume-ng avro-client -H localhost -p 4141 -F /usr/logs/log.10
    
    线看一波串联tail-avro-avro-logger的效果:
    scp -r /flume  mini2:$PWD
    
    mini1:  vi tail-avro.conf
    
    mini2:  vi avro-log.conf
    
    bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
    
    netstat -nltp查看端口是否监控
    
    bin/flume-ng agent -c conf -f conf/tail-avro.conf -n a1
    
    没办法高可用,但可以写脚本监控
    某些特定场景不好实现,常用日志场景,需要定制开发
    
    agent串联

    相关文章

      网友评论

          本文标题:flume数据采集

          本文链接:https://www.haomeiwen.com/subject/gggrrxtx.html