美文网首页
flume笔记

flume笔记

作者: 霹雳解锋镝 | 来源:发表于2019-12-24 16:21 被阅读0次

    Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

    一、主要组件:
    1、Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,是Flume数据传输的基本单元。Agent主要有3个部分组成,Source、Channel、Sink
    2、Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
    3、Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
    4、Flume自带两种Channel:Memory Channel和File Channel。
        Memory Channel是内存中的队列。在不需要关心数据丢失的情景下适用。
        File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
    5、Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
        Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。
        Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
    6、Event传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。
    
    二、flume使用案例
    1、监控端口数据
        (1) 查询端口是否被占用
            netstat -tunlp | grep 44444
            
        (2) vim flume-telnet-logger.conf
            # Name the components on this agent
            # 输入源 目的地 缓冲区
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # Describe/configure the source
            #输入源类型端口型 主机 端口
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 44444
    
            # Describe the sink
            #输入目的地是控制台logger类型
            a1.sinks.k1.type = logger
    
            # Use a channel which buffers events in memory
            #缓冲区类型内存型 1000个event 传输到100时提交事务
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
        (3) 先开启flume监听端口
            bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/flume-telnet-logger.conf -Dflume.root.logger=INFO,console
        (4) 使用telnet工具向本机的44444端口发送内容
            telnet localhost 44444
    
    2、实时读取本地文件到HDFS案例
        (1) flume-file-hdfs.conf
            # Name the components on this agent
            a2.sources = r2
            a2.sinks = k2
            a2.channels = c2
    
            # Describe/configure the source
            # 类型为exec可执行命令
            a2.sources.r2.type = exec
            a2.sources.r2.command = tail -F /tmp/hadoop/hive.log
            a2.sources.r2.shell = /bin/bash -c
    
            # Describe the sink
            a2.sinks.k2.type = hdfs
            a2.sinks.k2.hdfs.path = hdfs://hadoop130:9000/flume/%Y%m%d/%H
            #上传文件的前缀
            a2.sinks.k2.hdfs.filePrefix = logs-
            #是否按照时间滚动文件夹
            a2.sinks.k2.hdfs.round = true
            #多少时间单位创建一个新的文件夹
            a2.sinks.k2.hdfs.roundValue = 1
            #重新定义时间单位
            a2.sinks.k2.hdfs.roundUnit = hour
            #是否使用本地时间戳
            a2.sinks.k2.hdfs.useLocalTimeStamp = true
            #积攒多少个Event才flush到HDFS一次
            a2.sinks.k2.hdfs.batchSize = 1000
            #设置文件类型,可支持压缩
            a2.sinks.k2.hdfs.fileType = DataStream
            #多久生成一个新的文件
            a2.sinks.k2.hdfs.rollInterval = 600
            #设置每个文件的滚动大小
            a2.sinks.k2.hdfs.rollSize = 134217700
            #文件的滚动与Event数量无关
            a2.sinks.k2.hdfs.rollCount = 0
            #最小冗余数
            a2.sinks.k2.hdfs.minBlockReplicas = 1
    
            # Use a channel which buffers events in memory
            a2.channels.c2.type = memory
            a2.channels.c2.capacity = 1000
            a2.channels.c2.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a2.sources.r2.channels = c2
            a2.sinks.k2.channel = c2
        (2) 执行监控配置
            bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/flume-file-hdfs.conf
    3、实时读取目录文件到HDFS案例
        (1) 创建配置文件flume-dir-hdfs.conf
            # Name the components on this agent
            a3.sources = r3
            a3.sinks = k3
            a3.channels = c3
    
            # Describe/configure the source
            # 定义类型为目录  
            a3.sources.r3.type = spooldir
            a3.sources.r3.spoolDir = /tmp/hadoop/upload
            # 文件上传完后缀
            a3.sources.r3.fileSuffix = .COMPLETED
            # 是否有文件头 
            a3.sources.r3.fileHeader = true
            #忽略所有以.tmp结尾的文件,不上传
            a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    
            # Describe the sink
            a3.sinks.k3.type = hdfs
            a3.sinks.k3.hdfs.path = hdfs://hadoop130:9000/flume/upload/%Y%m%d/%H
            #上传文件的前缀
            a3.sinks.k3.hdfs.filePrefix = upload-
            #是否按照时间滚动文件夹
            a3.sinks.k3.hdfs.round = true
            #多少时间单位创建一个新的文件夹
            a3.sinks.k3.hdfs.roundValue = 1
            #重新定义时间单位
            a3.sinks.k3.hdfs.roundUnit = hour
            #是否使用本地时间戳
            a3.sinks.k3.hdfs.useLocalTimeStamp = true
            #积攒多少个Event才flush到HDFS一次
            a3.sinks.k3.hdfs.batchSize = 100
            #设置文件类型,可支持压缩
            a3.sinks.k3.hdfs.fileType = DataStream
            #多久生成一个新的文件
            a3.sinks.k3.hdfs.rollInterval = 600
            #设置每个文件的滚动大小大概是128M
            a3.sinks.k3.hdfs.rollSize = 134217700
            #文件的滚动与Event数量无关
            a3.sinks.k3.hdfs.rollCount = 0
            #最小冗余数
            a3.sinks.k3.hdfs.minBlockReplicas = 1
    
            # Use a channel which buffers events in memory
            a3.channels.c3.type = memory
            a3.channels.c3.capacity = 1000
            a3.channels.c3.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a3.sources.r3.channels = c3
            a3.sinks.k3.channel = c3
        (2) 执行监控配置  
            bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/flume-dir-hdfs.conf
    4、单数据源多出口
        配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hdfs和flume-flume-dir。
        (1)创建flume-file-flume.conf
            # Name the components on this agent
            a1.sources = r1
            a1.sinks = k1 k2
            a1.channels = c1 c2
            # 将数据流复制给多个channel
            a1.sources.r1.selector.type = replicating
    
            # Describe/configure the source
            a1.sources.r1.type = exec
            a1.sources.r1.command = tail -F /tmp/hadoop/hive.log
            a1.sources.r1.shell = /bin/bash -c
    
            # Describe the sink
            a1.sinks.k1.type = avro
            a1.sinks.k1.hostname = hadoop132
            a1.sinks.k1.port = 4141
    
            a1.sinks.k2.type = avro
            a1.sinks.k2.hostname = hadoop132
            a1.sinks.k2.port = 4142
    
            # Describe the channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            a1.channels.c2.type = memory
            a1.channels.c2.capacity = 1000
            a1.channels.c2.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a1.sources.r1.channels = c1 c2
            a1.sinks.k1.channel = c1
            a1.sinks.k2.channel = c2
            
            注:Avro是由Hadoop创始人Doug Cutting创建的一种语言无关的数据序列化和RPC框架。
            注:RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。  
        (2) 创建flume-flume-hdfs.conf
            # Name the components on this agent
            a2.sources = r1
            a2.sinks = k1
            a2.channels = c1
    
            # Describe/configure the source
            a2.sources.r1.type = avro
            a2.sources.r1.bind = hadoop132
            a2.sources.r1.port = 4141
    
            # Describe the sink
            a2.sinks.k1.type = hdfs
            a2.sinks.k1.hdfs.path = hdfs://hadoop130:9000/flume2/%Y%m%d/%H
            #上传文件的前缀
            a2.sinks.k1.hdfs.filePrefix = flume2-
            #是否按照时间滚动文件夹
            a2.sinks.k1.hdfs.round = true
            #多少时间单位创建一个新的文件夹
            a2.sinks.k1.hdfs.roundValue = 1
            #重新定义时间单位
            a2.sinks.k1.hdfs.roundUnit = hour
            #是否使用本地时间戳
            a2.sinks.k1.hdfs.useLocalTimeStamp = true
            #积攒多少个Event才flush到HDFS一次
            a2.sinks.k1.hdfs.batchSize = 100
            #设置文件类型,可支持压缩
            a2.sinks.k1.hdfs.fileType = DataStream
            #多久生成一个新的文件
            a2.sinks.k1.hdfs.rollInterval = 600
            #设置每个文件的滚动大小大概是128M
            a2.sinks.k1.hdfs.rollSize = 134217700
            #文件的滚动与Event数量无关
            a2.sinks.k1.hdfs.rollCount = 0
            #最小冗余数
            a2.sinks.k1.hdfs.minBlockReplicas = 1
    
            # Describe the channel
            a2.channels.c1.type = memory
            a2.channels.c1.capacity = 1000
            a2.channels.c1.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a2.sources.r1.channels = c1
            a2.sinks.k1.channel = c1
            
        (3) 创建flume-flume-dir.conf  
            # Name the components on this agent
            a3.sources = r1
            a3.sinks = k1
            a3.channels = c2
    
            # Describe/configure the source
            a3.sources.r1.type = avro
            a3.sources.r1.bind = hadoop132
            a3.sources.r1.port = 4142
    
            # Describe the sink
            a3.sinks.k1.type = file_roll
            a3.sinks.k1.sink.directory = /demo/flume3
    
            # Describe the channel
            a3.channels.c2.type = memory
            a3.channels.c2.capacity = 1000
            a3.channels.c2.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a3.sources.r1.channels = c2
            a3.sinks.k1.channel = c2
        (4) 执行配置文件
            bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/group1/flume-flume-dir.conf
            bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/group1/flume-flume-hdfs.conf
            bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/group1/flume-file-flume.conf
            
    5、单数据源多出口(负载均衡)
        (1) 创建flume-netcat-flume.conf
            # Name the components on this agent
            a1.sources = r1
            a1.channels = c1
            a1.sinkgroups = g1
            a1.sinks = k1 k2
    
            # Describe/configure the source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 44444
    
            #负载均衡 轮询模式
            a1.sinkgroups.g1.processor.type = load_balance
            a1.sinkgroups.g1.processor.backoff = true
            a1.sinkgroups.g1.processor.selector = round_robin
            a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
    
            # Describe the sink
            a1.sinks.k1.type = avro
            a1.sinks.k1.hostname = hadoop132
            a1.sinks.k1.port = 4141
    
            a1.sinks.k2.type = avro
            a1.sinks.k2.hostname = hadoop132
            a1.sinks.k2.port = 4142
    
            # Describe the channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a1.sources.r1.channels = c1
            a1.sinkgroups.g1.sinks = k1 k2
            a1.sinks.k1.channel = c1
            a1.sinks.k2.channel = c1
        (2) 创建flume-flume1.conf
            # Name the components on this agent
            a2.sources = r1
            a2.sinks = k1
            a2.channels = c1
    
            # Describe/configure the source
            a2.sources.r1.type = avro
            a2.sources.r1.bind = hadoop130
            a2.sources.r1.port = 4141
    
            # Describe the sink
            a2.sinks.k1.type = logger
    
            # Describe the channel
            a2.channels.c1.type = memory
            a2.channels.c1.capacity = 1000
            a2.channels.c1.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a2.sources.r1.channels = c1
            a2.sinks.k1.channel = c1
        (3) 创建flume-flume2.conf
            # Name the components on this agent
            a3.sources = r1
            a3.sinks = k1
            a3.channels = c2
    
            # Describe/configure the source
            a3.sources.r1.type = avro
            a3.sources.r1.bind = hadoop102
            a3.sources.r1.port = 4142
    
            # Describe the sink
            a3.sinks.k1.type = logger
    
            # Describe the channel
            a3.channels.c2.type = memory
            a3.channels.c2.capacity = 1000
            a3.channels.c2.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a3.sources.r1.channels = c2
            a3.sinks.k1.channel = c2
        (4) 执行配置文件
            bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/group2/flume-flume2.conf -Dflume.root.logger=INFO,console
            bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/group2/flume-flume1.conf -Dflume.root.logger=INFO,console
            bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/group2/flume-netcat-flume.conf
        (5) 使用telnet工具向本机的44444端口发送内容
            telnet localhost 44444
    6、多数据源汇总案例
        多Source汇总数据到单Flume
        (1) 创建flume1.conf
            # Name the components on this agent
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # Describe/configure the source
            a1.sources.r1.type = exec
            a1.sources.r1.command = tail -F /opt/module/group.log
            a1.sources.r1.shell = /bin/bash -c
    
            # Describe the sink
            a1.sinks.k1.type = avro
            a1.sinks.k1.hostname = hadoop131
            a1.sinks.k1.port = 4141
    
            # Describe the channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
        (2) 创建flume2.conf
            # Name the components on this agent
            a2.sources = r1
            a2.sinks = k1
            a2.channels = c1
    
            # Describe/configure the source
            a2.sources.r1.type = netcat
            a2.sources.r1.bind = hadoop130
            a2.sources.r1.port = 44444
    
            # Describe the sink
            a2.sinks.k1.type = avro
            a2.sinks.k1.hostname = hadoop132
            a2.sinks.k1.port = 4141
    
            # Use a channel which buffers events in memory
            a2.channels.c1.type = memory
            a2.channels.c1.capacity = 1000
            a2.channels.c1.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a2.sources.r1.channels = c1
            a2.sinks.k1.channel = c1
        (3) 创建flume3.conf
            # Name the components on this agent
            a3.sources = r1
            a3.sinks = k1
            a3.channels = c1
    
            # Describe/configure the source
            a3.sources.r1.type = avro
            a3.sources.r1.bind = hadoop132
            a3.sources.r1.port = 4141
    
            # Describe the sink
            # Describe the sink
            a3.sinks.k1.type = logger
    
            # Describe the channel
            a3.channels.c1.type = memory
            a3.channels.c1.capacity = 1000
            a3.channels.c1.transactionCapacity = 100
    
            # Bind the source and sink to the channel
            a3.sources.r1.channels = c1
            a3.sinks.k1.channel = c1
        (4) 执行配置文件
            bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/group3/flume3.conf -Dflume.root.logger=INFO,console
            bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/group3/flume2.conf
            bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/group3/flume1.conf
        (5) 追加数据
            echo 'hello' > group.log
            telnet hadoop104 44444

    相关文章

      网友评论

          本文标题:flume笔记

          本文链接:https://www.haomeiwen.com/subject/fvqfoctx.html