一、Flume简介
1.1 Flume的位置
1.2 Flume是什么
(1)Flume提供一种分布式的,可靠地,对大量数据的日志进行高效处理,聚集,移动的服务。flume只能在Unix的环境下运行。
(2)Flume基于流式框架,容错性强,也灵活简单。
(3)Flume/Kafka用来进行数据收集的,Spark,Storm用来实时处理数据,impala用来实时查询
(4)Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。
1.3 Flume官网
http://flume.apache.org/
1.4 Flume的两个版本
(一)Flume OG
OG:“Original Generation"
0.9x或cdh3以及更早的版本。
有agent、collector、master等组件构成。
(二)Flume NG
NG:“Next/New Generation"
1.x或cdh4以及之后的版本。
有Agent/client等组件构成。
1.5 FlumeOG存在的问题
(1)Flume OG代码工程臃肿
(2)核心组件设计不合理
(3)核心配置不标准
(4 )尤其是在Flume OG的最后一个发行版本0.94.0中,日志传输不稳定的现象尤为严重。
1.6 FlumeNG的特点
(1)NG只有一种角色的节点:代理节点(agent)
(2)没有collector,master节点。这是核心组件最核心的变化。
(3)去除了physical nodes,local nodes 的概念和相关内容。
(4)agent节点的组成也发生了变化,脱离了zookeeper。
1.7 Flume流程图
1.8 Flume几个核心概念
(一)Event
(1)Event是Flume数据传输的基本单元。
(2)Flume以事件的形式将数据从源头传送到最终的目的。
(3)Event由可选的header和载有数据的一个byte array 构成。
(4)载有的数据度flume是不透明的。
(5)Header是容纳了key-value字符串对的无序集合,key在集合内是唯一的。
(6)Header可以在上下文路由中使用扩展
(二)Client
(1)Client 是一个将原始log包装成events并且发送他们到一个或多个agent的实体
(2)目的是从数据源系统中解耦Flume
(3)在flume的拓扑结构中不是必须的。
(4)Client实例
(5)flume log4j Appender
(6)可以使用Client SDK(org.apache.flume.api)定制特定的Client
(三)Agent
(1)一个Agent包含 source ,channel,sink 和其他组件。
(2)它利用这些组件将events从一个节点传输到另一个节点或最终目的地
(3)agent是flume流的基础部分。
(4)flume为这些组件提供了配置,声明周期管理,监控支持。
(四)Agent之Source
(1)Source 负责接收event或通过特殊机制产生event,并将events批量的放到一个或多个Channel
(2)包含event驱动和轮询两种类型。
(3)不同类型的Source
(4)与系统集成的Source:Syslog,Netcat,监测目录池
(5)自动生成事件的Source:Exec
(6)用于Agent和Agent之间通信的IPC source:avro,thrift
(7)source 必须至少和一个channel关联
(五)Agent之Channel
(1)Channel位于Source和Sink之间,用于缓存进来的event
(2)当sink成功的将event发送到下一个的channel或最终目的 event从channel删除
(3)不同的channel提供的持久化水平也是不一样的
(4)Memorychannel :volatile (不稳定的)
(5)FileChannel:基于WAL( 预写式日志Write-Ahead logging)实现
(6)JDBCchannel :基于嵌入式database实现
(7)channel支持事务,提供较弱的顺序保证
(8)可以和任何数量的source和sink工作
(六)Agent之Sink
(1)Silk负责将event传输到吓一跳或最终目的地,成功后将event从channel移除
不同类型的silk
(2)存储event到最终目的地终端sink,比如 HDFS,HBase
(3)自动消耗的sink 比如 null sink
(4)用于agent间通信的IPC:sink:Avro
(5)必须作用于一个确切的channel
(七)Iterator
作用于Source,按照预设的顺序在必要地方装饰和过滤events
(八)channel selector
允许Source基于预设的标准,从所有channel中,选择一个或者多个channel
(九)sink processor
(1)多个sink 可以构成一个sink group
(2)sink processor 可以通过组中所有sink实现负载均衡
(3)也可以在一个sink失败时转移到另一个
1.9 Flume的优势
(1)Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase
(2)当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供一共平稳的数据.
(3)提供上下文路由特征
(4)Flume的管道是基于事务,保证了数据在传送和接收时的一致性.
(5)Flume是可靠的,容错性高的,可升级的,易管理的,并且可定制的。
1.10 Flume具有的特征
(1)Flume可以高效率的将多个网站服务器中收集的日志信息存入HDFS/HBase中
(2)使用Flume,我们可以将从多个服务器中获取的数据迅速的移交给Hadoop中
(3)除了日志信息,Flume同时也可以用来接入收集规模宏大的社交网络节点事件数据,比如facebook,twitter,电商网站如亚马逊,flipkart等
(4)支持各种接入资源数据的类型以及接出数据类型
(5)支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等
(6)可以被水平扩展
二、Flume架构
2.1数据流
Flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,
待数据真正到达目的地后,删除自己缓存的数据
Flume传输的数据的基本单位是Event,如果是文件,通常是一行记录,这也是事务的基本单位。Event从Source,流向Channel
,再到Sink,本身为一个byte数组,并可携带headers信息。Event代表着一个数据流的最小完整单元,从外部数据源过来,向外部的目的地去。
Flume运行的核心是Agent.它是一个完整的数据收集工具,含有三个核心组件,分别是source,channel,sink。通过这些组件,Event可以从一个地方流向另一个地方。
2.2 Flume核心Agent
Flume运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示
2.2.1核心组件之source
Client端操作数据的来源,Flume支持Avro,log4j,syslog,和http(body为json格式)。可以让应用程序同 已有的source直接打交道,如AvroSource,SysconfigTcpSource.也可以写一个source,以IPC或RPC的方式自己接入自己的应用,Avro和Thrift都可以(分别有NettyAvroRpcClient 和 ThriftRpcClient实现了RpcClient接口),其中Avro是默认的RPC协议。具体代码级别的Client端数据接入,可以参考官方手册
对现有程序改动最小的使用方式是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有的程序进行任何改动。对于直接读取文件Source有两种方式:
1,ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如 tail -F 文件名 指令,在这种方式下,取得文件名必须是指定的。ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法保证日志数据的完整性
2,SpoolSource:检测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到spool目录下的文件不可以再打开编辑;spool目录下不可包含相应的子目录
SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋于实时。
如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。在实际使用的过程中,可以结合log4j使用,使用log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录
log4j有一个TimeRolling的插件,可以把log4j分割文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED
2.2.2 Flume Source支持的类型
2.2.1核心组件之source
Client端操作数据的来源,Flume支持Avro,log4j,syslog,和http(body为json格式)。可以让应用程序同 已有的source直接打交道,如AvroSource,SysconfigTcpSource.也可以写一个source,以IPC或RPC的方式自己接入自己的应用,Avro和Thrift都可以(分别有NettyAvroRpcClient 和 ThriftRpcClient实现了RpcClient接口),其中Avro是默认的RPC协议。具体代码级别的Client端数据接入,可以参考官方手册
对现有程序改动最小的使用方式是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有的程序进行任何改动。对于直接读取文件Source有两种方式:
1,ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如 tail -F 文件名 指令,在这种方式下,取得文件名必须是指定的。ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法保证日志数据的完整性
2,SpoolSource:检测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到spool目录下的文件不可以再打开编辑;spool目录下不可包含相应的子目录
SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋于实时。
如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。在实际使用的过程中,可以结合log4j使用,使用log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录
log4j有一个TimeRolling的插件,可以把log4j分割文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED
2.2.2 Flume Source支持的类型
2.2.3核心组件之Channel
当前有几个channel可供选择,分别是Memory Channel,JDBC Channel,File Channel,Psuedo Transaction Channel。比较常见的是前三种。
(1)MemoryChannel:可以实现高速的吞入,但是无法保证数据的完整性
(2)MemoryRecoverChannel:在官方文档的件以上已经建议使用FileChannel来替换
(3)FileChannel:保证数据的完整性与一致性。再具体配置FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
File Channel是一个持久化的隧道(Channel),它持久化所有的事件,并将其存储到磁盘中。因此,即使Java虚拟机当掉,或者操作系统崩溃或重启,再或者事件没有在管道中成功的传递到下一个代理(agent),这一切都不会造成数据的丢失。Memory Channel是一个不稳定的隧道,其原因是由于它在内存中存储所有的事件,如果Java进程死掉,任何存储在内存中的数据将会丢失。另外,内存空间受到RAM大小的限制,而File Channel这方面是它的优势,只要磁盘空间足够大,他就可以将所有事件存储到磁盘。
2.2.4 Flume Channel支持的类型
2.2.5核心组件之Sink
Sink在设置存储数据时,可以向文件系统,数据库,hadoop存储据,在日志数据较少时,可以将数据存储在文件中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的数据存储到Hadoop中,便于日后进行相应的数据分析。
2.2.6 Flume Sink支持的类型
2.2.7 Flume Agent内部原理
2.3可靠性
Flume的核心是吧数据源收集起来,再送到目的地。为了保证输送一定成功,再送到目的地之前,会先缓存数据,带数据真正到达目的地后再删除缓存的数据
Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉。这样数据流里的event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上的事务保证了event成功存储起来。而Channel的多种实现可恢复性上有不同的保证。也保证了event不同程度的可靠性。比如Flume支持在本地保存一份channel作为备份,而memory channel 将event存在内存queue里,速度快,但丢失的话无法恢复
2.4配置实例
2.3可靠性
Flume的核心是吧数据源收集起来,再送到目的地。为了保证输送一定成功,再送到目的地之前,会先缓存数据,带数据真正到达目的地后再删除缓存的数据
Flume使用事务性的方式保证传送Event整个过程的可靠性。Sink必须在Event被存入Channel后,或者,已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把Event从Channel中remove掉。这样数据流里的event无论是在一个agent里还是多个agent之间流转,都能保证可靠,因为以上的事务保证了event成功存储起来。而Channel的多种实现可恢复性上有不同的保证。也保证了event不同程度的可靠性。比如Flume支持在本地保存一份channel作为备份,而memory channel 将event存在内存queue里,速度快,但丢失的话无法恢复
2.4配置实例
三、Flume拦截器、数据流以及可靠性
3.1 Flume拦截器
当我们需要对数据进行过滤时,除了我们在Source、 Channel和Sink进行代码修改之外, Flume为我们提供了拦截器,拦截器也是chain形式的。
拦截器的位置在Source和Channel之间,当我们为Source指定拦截器后,我们在拦截器中会得到event,根据需求我们可以对event进行保留还是
抛弃,抛弃的数据不会进入Channel中。
3.2 Flume数据流
1)Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。 2) Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。
比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,
也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是Flume强大之处。如下图所示:
3.3 Flume可靠性
Flume使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 被存入 Channel 后,或者,已经被传达到下一站agent里,又或者,
已经被存入外部数据目的地之后,才能把Event从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,
都能保证可靠,因为以上的事务保证了event会被成功存储起来。比如 Flume支持在本地保存一份文件 channel 作为备份,而memory channel 将
event存在内存 queue 里,速度快,但丢失的话无法恢复。
四、Flume使用场景
Flume在英文中的意思是水道, 但Flume更像可以随意组装的消防水管,下面根据官方文档,展示几种Flow。
4.1 多个agent顺序连接
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的 Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
4.2 多个Agent的数据汇聚到同一个Agent
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志, Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为 每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
4.3 多级流
Flume还支持多级流,什么多级流?结合在云开发中的应用来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent 后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。
4.4 load balance功能
上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上 。
五、Flume自定义MySQLSource
5.1 自定义Source说明
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source。
如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。
官方也提供了自定义source的接口:
官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source
5.3 自定义MySQLSource组成
5.2 自定义MySQLSource步骤
根据官方说明自定义MySqlSource需要继承AbstractSource类并实现Configurable和PollableSource接口。
实现相应方法:
getBackOffSleepIncrement()//暂不用
getMaxBackOffSleepInterval()//暂不用
configure(Context context)//初始化context
process()//获取数据(从MySql获取数据,业务处理比较复杂,所以我们定义一个专门的类——SQLSourceHelper来处理跟MySql的交互),封装成Event并写入Channel,这个方法被循环调用
stop()//关闭相关的资源
5.4 代码实现
5.4.1 导入Pom依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
5.4.2 添加配置信息
在ClassPath下添加jdbc.properties和log4j. properties
jdbc.properties:
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8dbUser=rootdbPassword=000000
log4j. properties:
#--------console-----------
log4j.rootLogger=info,myconsole,myfilelog4j.appender.myconsole=org.apache.log4j.ConsoleAppenderlog4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n#log4j.rootLogger=error,myfilelog4j.appender.myfile=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.myfile.File=/tmp/flume.loglog4j.appender.myfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
七、Flume企业真实面试题
7.1 Flume数据传输的监控的
使用第三方框架Ganglia实时监控Flume。
7.2 Flume的Source,Sink,Channel的作用?你们Source是什么类型?
1、作用
(1)Source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
(2)Channel组件对采集到的数据进行缓存,可以存放在Memory或File中。
(3)Sink组件是用于把数据发送到目的地的组件,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。
2、我公司采用的Source类型为:
(1)监控后台日志:exec
(2)监控后台产生日志的端口:netcat
Exec spooldir
7.3 Flume的Channel Selectors
7.4 Flume参数调优
1. Source
增加Source个(使用Tair Dir Source时可增加FileGroups个数)可以增大Source的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source 以保证Source有足够的能力获取到新产生的数据。
batchSize参数决定Source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。
2. Channel
type选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据。type选择file时Channel的容错性更好,但是性能上会比memory channel差。
使用file Channel时dataDirs配置多个不同盘下的目录可以提高性能。
Capacity参数决定Channel可容纳最大的event条数。transactionCapacity 参数决定每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数。transactionCapacity需要大于Source和Sink的batchSize参数。
3. Sink
增加Sink的个数可以增加Sink消费event的能力。Sink也不是越多越好够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费。
batchSize参数决定Sink一次批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬出event的性能。
7.5 Flume的事务机制
Flume的事务机制(类似数据库的事务机制):Flume使用两个独立的事务分别负责从Soucrce到Channel,以及从Channel到Sink的事件传递。比如spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到Channel且提交成功,那么Soucrce就将该文件标记为完成。同理,事务以类似的方式处理从Channel到Sink的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到Channel中,等待重新传递。
7.6 Flume采集数据会丢失吗
不会,Channel存储可以存储在File中,数据传输自身有事务。
网友评论