美文网首页
Flume 入门

Flume 入门

作者: djm猿 | 来源:发表于2019-09-26 21:37 被阅读0次

    1Flume概述

    1.1 定义

    FlumeCloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统;

    Flume基于流式架构,灵活简单。

    1.2 特点

    可以和任意存储进程集成

    输入的的数据速率大于写入目的存储的速率,Flume会进行缓冲,减小HDFS的压力

    Flume中的事务基于Channel,使用了两个事务模型(sender+ receiver),确保消息被可靠发送

    Flume使用两个独立的事务分别负责从SoucrceChannel,以及从ChannelSink 的事件传递。一旦事务中所有的数据全部成功提交到Channel,那么Source才认为该数据读取完成,同理,只有成功被Sink写出去的数据,才会从Channel中移除

    1.3 组成架构

    image

    1.3.1Agent

    Agent是一个JVM进程,它以事件的形式将数据从源头传递到目的地

    Agent主要由SourceChannelSink组成

    1.3.2Source

    Source是负责接收数据到Agent的组件,可以处理各种类型,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy

    1.3.3Channel

    Channel是位于SourceSink之间的缓冲区,因此,Channel允许SourceSink运作在不同的速率上,Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

    Flume自带两种Channel

    Memory Channel:内存中的队列速度快,适合在不需要关系数据丢失的情境下使用

    File Channel:将所有事件写入磁盘,因此在程序关闭或机器宕机的情况下不会丢失数据

    1.3.4Sink

    Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent

    Sink是完全事务性的,在从Channel批量删除数据之前,每个SinkChannel启动一个事务,批量事件一旦成功写出到存储系统或下一个Flume AgentSink就利用Channel提交事务,事务一旦被提交,该Channel从自己的内部缓冲区删除事件。

    Sink组件目的地包括hdfsloggeravrothriftipcfilenullHBasesolr、自定义。

    1.3.5Event

    传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。

    Event由可选的header和载有数据的一个byte array构成,Header是容纳了key-value字符串对的HashMap

    通常一条数据就是一个 Event,每2048个字节划分一个Event

    1.4 拓扑结构

    image

    这种模式是将多个Flume给顺序连接起来了,从最初的Source开始到最终Sink传送的目的存储系统,此模式不建议桥接过多的Flume数量, Flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点Flume宕机,会影响整个传输系统。

    image

    Flum支持将事件流向一个或者多个目的地,这种模式将数据源复制到多个Channel中,每个Channel都有相同的数据,Sink可以选择传送的不同的目的地。

    image

    Flume支持使用将多个Sink逻辑上分到一个Sink组,Flume将数据发送到不同的Sink,主要解决负载均衡和故障转移问题。

    image

    这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志,处理起来也非常麻烦,用Flume的这种组合方式能很好的解决这一问题,每台服务器部署一个Flume采集日志,传送到一个集中收集日志的Flume,再由此Flume上传到 hdfshivehbasejms等进行日志分析。

    1.5Agent原理

    image

    2Flume部署

    1、解压apache-flume-1.7.0-bin.tar.gz/opt/module目录下

    2、修改apache-flume-1.7.0-bi的名称为flume

    3、将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh中的JAVA_HOME

    3 企业开发案例

    3.1 监控端口数据

    需求分析:

    服务端监听本机44444端口

    服务端使用netcat工具向44444端口发送消息

    最后将数据展示在控制台上

    实现步骤:

    1、在job文件夹下创建Agent配置文件flume-netcat-logger.conf

    [djm@hadoop102 job]$ vim flume-netcat-logger.conf
    

    2、添加如下内容:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    3、启动任务

    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console
    

    参数说明:

    --conf conf/表示配置文件存储在conf/目录

    --name a1表示给 Agent 起名为a1

    --conf-file job/flume-netcat.conf Flume本次启动读取的配置文件是在job文件夹下的 flume-telnet.conf文件

    -Dflume.root.logger==INFO,console -D表示Flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别

    3.2 实时读取本地文件到HDFS

    需求分析:

    实时监控Hive日志,并上传到HDFS

    实现步骤:

    1、在job文件夹下创建Agent配置文件flume-file-hdfs.conf

    [djm@hadoop102 job]$ vim flume-file-hdfs.conf
    

    2、添加如下内容:

    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
    a2.sources.r2.shell = /bin/bash -c
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a2.sinks.k2.hdfs.batchSize = 1000
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k2.hdfs.rollInterval = 60
    #设置每个文件的滚动大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a2.sinks.k2.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    

    3、启动任务

    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf
    

    注意:

    要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令,由于Hive日志在Linux系统中所以读取文件的类型选择:execexecute执行的意思。表示执行Linux命令来读取文件。

    3.3 实时读取目录文件到 HDFS

    需求分析:

    使用Flume监听整个目录的文件

    实现步骤:

    1、在job文件夹下创建Agent配置文件flume-dir-hdfs.conf

    [djm@hadoop102 job]$ vim flume-dir-hdfs.conf
    

    2、添加如下内容:

    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/module/flume/upload
    a3.sources.r3.fileSuffix = .COMPLETED
    a3.sources.r3.fileHeader = true
    #忽略所有以.tmp结尾的文件,不上传
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    #设置每个文件的滚动大小大概是128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    

    3、启动任务

    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf
    

    注意:

    不要在监控目录中创建并持续修改文件

    3.4 单数据源多出口案例(选择器)

    需求分析:

    使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2

    Flume-2负责存储到HDFS

    同时Flume-1将变动内容传递给Flume-3Flume-3负责输出到Local FileSystem

    1、在group1文件夹下创建Agent配置文件flume-file-flume.conf

    [djm@hadoop102 group1]$ vim flume-file-flume.conf
    

    2、添加如下内容:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    # 将数据流复制给所有channel
    a1.sources.r1.selector.type = replicating
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
    a1.sources.r1.shell = /bin/bash -c
    
    # Describe the sink
    # sink端的avro是一个数据发送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102 
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 4142
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    

    3、在group1文件夹下创建Agent配置文件flume-flume-hdfs.conf

    [djm@hadoop102 group1]$ vim flume-flume-hdfs.conf
    

    4、添加如下内容:

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    # source端的avro是一个数据接收服务
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 4141
    
    # Describe the sink
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k1.hdfs.filePrefix = flume2-
    #是否按照时间滚动文件夹
    a2.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a2.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a2.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k1.hdfs.rollInterval = 600
    #设置每个文件的滚动大小大概是128M
    a2.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a2.sinks.k1.hdfs.rollCount = 0
    
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    

    5、在group1文件夹下创建 Agent 配置文件flume-flume-dir.conf

    [djm@hadoop102 group1]$ vim flume-flume-dir.conf
    

    6、添加如下内容:

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 4142
    
    # Describe the sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/module/data/flume3
    
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    

    7、启动任务

    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
    

    注意:

    Avro是一种语言无关的数据序列化和RPC框架

    输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录

    必须先启动Sink存在的job

    3.5 单数据源多出口案例(Sink组)

    需求分析:

    使用Flume-1监控端口数据,Flume-1将变动内容传递给Flume-2

    Flume-2负责将数据展示在控制台上

    同时Flume-1将变动内容传递给Flume-3Flume-3也负责将数据展示在控制台上

    实现步骤:

    1、在group2文件夹下创建Agent配置文件flume-netcat-flume.conf

    2、添加如下内容:

    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = round_robin
    a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 4141
    
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 4142
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
    

    3、在group2文件夹下创建Agent配置文件flume-flume-console1.conf

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 4141
    
    # Describe the sink
    a2.sinks.k1.type = logger
    
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    

    5、在 group2文件夹下创建Agent配置文件flume-flume-console2.conf

    6、添加如下内容:

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 4142
    
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    
    

    7、启动任务

    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
    
    

    3.6 多数据源汇总

    需求分析:

    hadoop103上的Flume-1监控文件/opt/module/group.log

    hadoop102上的Flume-2监控某一个端口的数据流

    Flume-1Flume-2将数据发送给hadoop104上的Flume-3Flume-3将最终数据打印到控制台

    实现步骤:

    1、在group3文件夹下创建Agent配置文件flume1-logger-flume.conf

    [djm@hadoop102 group3]$ vim flume1-logger-flume.conf 
    
    

    2、添加如下内容:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/group.log
    a1.sources.r1.shell = /bin/bash -c
    
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop104
    a1.sinks.k1.port = 4141
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    

    3、在group3文件夹下创建Agent配置文件flume2-netcat-flume.conf

    [djm@hadoop102 group3]$ vim flume2-netcat-flume.conf
    
    

    4、添加如下内容:

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    
    # Describe/configure the source
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 44444
    
    # Describe the sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop104
    a2.sinks.k1.port = 4141
    
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    

    5、在group3文件夹下创建Agent配置文件flume3-flume-logger.conf

    [djm@hadoop102 group3]$ vim flume3-flume-logger.conf
    
    

    6、添加如下内容:

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4141
    
    # Describe the sink
    # Describe the sink
    a3.sinks.k1.type = logger
    
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
    

    7、分发配置文件

    [djm@hadoop102 group3]$ xsync /opt/module/flume/job
    
    

    8、启动任务

    [djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
    [djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
    
    

    4Ganglia部署

    1、安装httpd服务与php

    yum -y install httpd php
    
    

    2、安装其他依赖

    yum -y install rrdtool perl-rrdtool rrdtool-devel
    
    

    3、安装ganglia

    rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
    yum -y install ganglia-gmetad ganglia-gmond ganglia-web 
    
    

    4、修改ganglia配置文件

    vim /etc/httpd/conf.d/ganglia.conf
    
    
    #
    # Ganglia monitoring system php web frontend
    #
    
    Alias /ganglia /usr/share/ganglia
    
    <Location /ganglia>
      # Require local
      Require all granted
      # Require ip 10.1.2.3
      # Require host example.org
    </Location> 
    
    

    特别注意:以下配置是不能起作用的

    <Location /ganglia>
      Order deny,allow
      Allow from all
    </Location> 
    
    

    5、修改gmetad配置文件

    vim /etc/ganglia/gmetad.conf 
    
    
    data_source "hadoop102" 192.168.1.102
    
    

    6、修改gmond配置文件

    vim /etc/ganglia/gmond.conf 
    
    
    cluster {
      #name = "unspecified"
      name = "hadoop102"
      owner = "unspecified"
      latlong = "unspecified"
      url = "unspecified"
    }
    
    udp_send_channel { 
    #bind_hostname = yes # Highly recommended, soon to be default. 
    # This option tells gmond to use a source address
    # that resolves to the machine's hostname. Without
    # this, the metrics may appear to come from any
    # interface and the DNS names associated with
    # those IPs will be used to create the RRDs.
      #mcast_join = 239.2.11.71
      host = 192.168.10.102
      port = 8649
      ttl = 1
    }
    
    /* You can specify as many udp_recv_channels as you like as well. */
    udp_recv_channel {
      #mcast_join = 239.2.11.71
      port = 8649
      #bind = 239.2.11.71
      bind = 192.168.10.102
      retry_bind = true 
    
    # Size of the UDP buffer. If you are handling lots of metrics you really
    # should bump it up to e.g. 10MB or even higher.
    # buffer = 10485760
    } 
    
    

    6、查看SELinux状态

    sestatus
    
    

    如果不是disabled,需修改以下配置文件:

    vim /etc/selinux/config
    
    

    或者临时关闭SELinux

    setenforce 0
    
    

    7、启动ganglia

    systemctl start httpd
    systemctl start gmetad 
    systemctl start gmond
    
    

    8、打开浏览器访问

    http://hadoop102/ganglia/
    
    

    如果完成以上操作仍出现权限不足错误,可修改/var/lib/ganglia目录的权限尝试

    chmod -R 777 /var/lib/ganglia
    
    

    5 自定义Source

    需求分析:

    image

    编码实现:

    1、引入依赖

    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
    
    

    2、代码编写

    package com.djm.flume;
    
    import org.apache.flume.Context;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.PollableSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    
    import java.util.HashMap;
    
    public class MySource extends AbstractSource implements Configurable, PollableSource {
    
        //定义配置文件将来要读取的字段
        private Long delay;
    
        private String field;
    
        /**
         * 接收数据,将数据封装成一个个event,写入channel
         * @return
         * @throws EventDeliveryException
         */
        public Status process() throws EventDeliveryException {
            HashMap<String, String> hearderMap  = new HashMap<>();
            SimpleEvent event = new SimpleEvent();
                try {
                    for (int i = 0; i < 5; i++) {
                        event.setHeaders(hearderMap);
                        event.setBody((field + i).getBytes());
                        getChannelProcessor().processEvent(event);
                        Thread.sleep(delay);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return Status.BACKOFF;
                }
                return Status.READY;
        }
    
        public long getBackOffSleepIncrement() {
            return 0;
        }
    
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    
        /**
         * 读取配置文件
         * @param context
         */
        public void configure(Context context) {
            delay = context.getLong("delay");
            field = context.getString("field", "hello");
        }
    }
    
    

    3、打包测试

    利用Maven打包并上传到 /opt/module/flume/lib目录下

    job文件夹下创建Agent配置文件mysource.conf

    [djm@hadoop102 job]$ vim mysource.conf
    
    

    添加如下内容:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = com.djm.flume.MySource
    a1.sources.r1.delay = 1000
    a1.sources.r1.field = djm
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    

    启动任务

    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
    
    

    6 自定义Sink

    需求分析:

    image

    编码实现:

    1、引入依赖

    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
    
    

    2、代码编写

    package com.djm.flume;
    
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MySink extends AbstractSink implements Configurable {
    
        private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
    
        private String prefix;
        private String suffix;
    
        @Override
        public Status process() throws EventDeliveryException {
            Status status = null;
    
            Channel channel = getChannel();
            Transaction transaction = channel.getTransaction();
            try {
                Event event;
                transaction.begin();
                while ((event = channel.take()) == null) {
                    Thread.sleep(200);
                }
                LOG.info(prefix + new String(event.getBody()) + suffix);
                transaction.commit();
                status = Status.READY;
            } catch (Throwable e) {
                transaction.rollback();
                status = Status.BACKOFF;
                if (e instanceof Error)
                    throw (Error) e;
            } finally {
                transaction.close();
            }
            return status;
        }
    
        @Override
        public void configure(Context context) {
            prefix = context.getString("prefix");
            suffix = context.getString("suffix");
        }
    }
    
    

    3、打包测试

    利用Maven打包并上传到 /opt/module/flume/lib目录下

    job文件夹下创建Agent配置文件mysource.conf

    [djm@hadoop102 job]$ vim mysink.conf
    
    

    添加如下内容:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = com.djm.flume.MySink
    a1.sinks.k1.prefix = djm:
    a1.sinks.k1.suffix = :end
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    

    启动任务

    [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
    
    

    7Flume参数调优

    7.1Source

    增加Source个数可以增大Source的读取数据的能力,例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source以保证Source有足够的能力获取到新产生的数据。

    batchSize参数决定Source一次批量运输到ChannelEvent条数,适当调大这个参数可以提高Source搬运EventChannel时的性能。

    7.2Channel

    Type选择Memory ChannelChannel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据

    Type选择File ChannelChannel的容错性更好,但是性能上会比Memory Channel差,使用File Channel时`dataDirs 配置多个不同盘下的目录可以提高性能。

    Capacity参数决定Channel可容纳最大的Event条数,TransactionCapacity 参数决定每次SourceChannel里面写的最大Event条数和每次SinkChannel里面读的最大Event条数,TransactionCapacity需要大于SourceSinkbatchSize参数。

    7.3Sink

    增加Sink的个数可以增加Sink消费Event的能力,Sink也不是越多越好够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费。

    batchSize参数决定Sink一次批量从Channel读取的Event条数,适当调大这个参数可以提高SinkChannel搬出Event的性能。

    相关文章

      网友评论

          本文标题:Flume 入门

          本文链接:https://www.haomeiwen.com/subject/bzgnuctx.html