Apache Flume 1.6.0 用户指南

作者: rabbitGYK | 来源:发表于2016-07-03 23:11 被阅读5970次

    博客原文

    翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档

    引言

    概述

    Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。

    Apache Flume的作用不仅限于日志汇总,因为数据源是可以自定义的,Flume也可以被用于传输大量的事件数据,包括但不限于网络流量数据、社交媒体产生的数据、电子邮件和几乎所有可能的数据源。

    Apache Flume是Apache软件基金会的顶级项目。

    目前有两个已发布的代码版本可以使用,分别是0.9.x和1.x.

    0.9.x系列的文档获取方式是 Flume 0.9.x User Guide.

    这一篇文档适用于1.4.x系列版本。

    鼓励新用户和现存用户使用1.x系列的发布版本,因为在最新的版本中具有显著的性能提升和灵活配置的特性。

    系统要求

    1. java运行环境-java 1.6或者更新(推荐使用1.7)
    2. 内存-有足够的内存满足于sources、channels、和sinks的配置
    3. 存储空间-有足够的空间满足于channels和sinks的配置
    4. 目录权限-agent使用的目录要有读写权限

    架构

    数据流模型

    Flume event是一个数据流的基本单元,包含一个字节的载荷和一些可选的字符属性;Flume agent是一个JVM进程包含一些组件,通过这些组件,event从一个外部源流向下一个目的地(即一跳的传输)。


    图片来自官网

    Flume source消费像web server一样的外部source交付给它的event,外部source是以一种可以被目标Flume source识别的格式向Flume发送event。例如,一个Avro Flume source可以接收Avro event,这些event可以来自Avro client,也可以来自流中其它的Flume agent从Avro sink中发送的event。一个类似的数据流可以是这样的,Thrift Flume Source可以接收的event包括来自Thrift Sink,来自Flume Thrift RPC Client,来自其他任何语言根据Flume Thrift协议实现的Thrift Client。当Flume Source接收到一个event时,它会将其存储到一个或者多个Channel中,Channel是一个被动存储,它会存储接收的event一直到该event被Flume Sink消费掉。就拿file channel举例,它是依靠本地文件系统的。Sink将event从channel中清除掉,然后把它放到一个外部的仓库(如HDFS)中,这可以通过Flume HDFS Sink实现,或者把它传递给整个流中下一个Flume agent(即下一跳)的Flume Source。因为有channel的存储,在一个给定的agent中的Source和Sink可以分阶段地异步地处理event。

    复杂数据流

    Flume允许用户创建多跳的流程,其中的event在到达最终的目的地之前会经过多个agent。Flume也允许扇入扇出(扇入:一个模块被多个模块调用,扇出:一个模块调用多个模块)模式的数据流,基于上下文的路由和作为失败跳转的备份路由(故障转移)。

    可靠性

    在每一个agent中的event都是存储在相应的channel中的,然后这些event才会被传送到下一个agent或者最终的存储系统(如HDFS),这些event只用存储在下一个agent中的channel中或者最终存储系统中之后才会从当前的channel中清除。这也就是Flume在一次跳转的数据流中如何提供端到端可靠性的。

    Flume使用一种事务性的方法来保证event传送过程中的可靠性,Source和Sink封装在了一个存储/检索event的事务中,分别对应channel提供的放入/取出的事务,这就保证了event在数据流的点对点传输过程中的可靠性。在多跳数据流的情形下,上一跳的sink和下一跳的source它们在一个事务中运行,确保数据能够安全地存储到下一跳的channel中。

    可恢复性

    Event被存储在channel中,同时channel管理着事务的失败恢复。Flume有一个基于本地文件系统的可持久化的文件channel,还有一个内存channel,它能简单地将event存储在内存队列中,它的速度更快,但是当一个agent进程死掉的时候,还在channel中的event就不能恢复了。

    安装

    安装agent

    Flume agent的配置是存储在本地的配置文件中的,这是一个文本文件,它是java properties文件格式,可以在一个配置文件中指定一个或者多个agent的配置。这个配置文件包括一个agent中每一个source,sink和channel的属性,而且还有它们是如何绑在一些形成数据流的。

    配置单个组件

    在数据流中的每一个组件(source,sink或者channel)都有一个name,type和一些跟这些type和实例相关的属性,例如,一个Avro source需要一个hostname(或者Ip地址)和一个端口号来接收数据,一个内存channel需要有最大的队列长度(容量),一个HDFS sink需要一个已知的文件系统URI,创建文件路径和文件的循环频次(hdfs.rollInterval)等等。一个组件的所有这些属性都被设置在Flume agent的properties文件中。

    组件串联

    agent需要知道哪一个组件需要加载,它们是怎样有序地连接在一起组成一个数据流的。这就需要列举出一个agent中的每一个source,sink和channel,并且给每一个source和sink指定连接的channel。例如,一个agent从一个叫做“avroWeb”的Avro source中将event传送到一个叫做“hdfs-cluster1”的HDFS sink中,它们之间是通过一个叫做“file-channel”的文件channel,因此在配置文件中需要包含这些组件的名字和avroWeb source and hdfs-cluster1 sink共享的通道file-channel。

    启动一个agent

    agent是用一个叫做flume-ng的shell脚本启动的,这个脚本位于flume的bin目录下面,执行时你需要指定agent的名字,配置文件目录和配置文件,命令如下:

    $ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
    

    现在agent将启动运行指定properties文件配置的source和sink。

    一个简单的例子

    这里我们给出了一个配置文件的例子,描述了一个单点的flume的部署,这个配置文件能使用户生产的event顺序地打印在console控制台上。

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    在这个配置文件里面仅定义了一个agent,它的名字叫a1,a1有一个source监听的是端口44444的数据,有一个channel是在内存中缓存event数据,还有一个sink将event数据打印到console控制台。在这个配置文件中配置了多个组件,然后描述了它们的type和配置参数。一个给定的配置文件可以指定多个不同名字的agent,当一个flume进程启动的时候,一个标志会被传进去告诉它启动哪一个agent。
    考虑这个配置文件,我们可以使用下面的命令来启动flume:

    $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
    

    注:在实际的部署中我们通常会多包含一个参数:--conf=<conf-dir>,这个目录<conf-dir>包含一个shell脚本 flume-env.sh 和一个强大的log4j的properties文件,在这个例子中我们还传进去了一个java选项,强制Flume将日志打印在控制台console,而不是用定制的环境脚本。
    在另外一个终端我们可以telnet端口44444,发送给flume一个event:

    $ telnet localhost 44444
    Trying 127.0.0.1...
    Connected to localhost.localdomain (127.0.0.1).
    Escape character is '^]'.
    Hello world! <ENTER>
    OK
    

    在原来的flume的终端上将会输出这个event的log信息:

    12/06/19 15:32:19 INFO source.NetcatSource: Source starting
    12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
    12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }
    

    祝贺,你已经成功地配置并部署了一个Flume agent,在接下来的部分会更加详细地介绍agent的配置信息。

    基于Zookeeper的配置

    Flume支持通过zookeeper配置agent,但这是一个实验性特性,配置文件需要上传到zookeeper上去,在一个可配置的前缀下面,这个配置文件被存储在zookeeper的节点数据中,下面是zookeeper的节点树,比如 agent a1和a2.

    - /flume
     |- /a1 [Agent config file]
     |- /a2 [Agent config file]
    

    配置文件上传到zookeeper之后,agent的启动命令如下:

    $ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
    

    参数解释如下:

    参数名 默认值 描述
    z - zookeeper的连接字符串,hostname:port列表用逗号分隔
    p /flume zookeeper中存储agent配置文件的根路径

    安装第三方插件

    Flume有一个完整的基于插件的架构。Flume自带了很多开箱即用的source,sink,channel,serializer,并且在发Flume之外存在很多第三方的实现。
    当然你也可以自定义很多组件,只要将它们的jar包放在flume-env.sh文件中的FLUME_CLASSPATH的变量中就可以。Flume现在支持一个特殊的目录叫plugins.d,其中按照一定格式打包的插件能够被自动检测到,这样使得插件的打包管理很方便,而且也使调试和问题跟踪变得简单了,尤其是依赖包的冲突问题。

    plugins.d目录

    plugins.d目录的位置是$FLUME_HOME/plugins.d. 在启动的时候,flume-ng的启动脚本会检查plugins.d目录下符合以下格式的插件,把它们包含到java启动时合适的路径上。

    插件的目录布局

    plugins.d目录下面的每一个插件(子目录)可以有三个子目录:

    1. lib - 存放插件的jar包。
    2. libext - 插件依赖的jar包。
    3. native - 存放任何的本地依赖包,如.so文件。

    下面有两个存储在目录plugins.d里的插件的例子:

    plugins.d/
    plugins.d/custom-source-1/
    plugins.d/custom-source-1/lib/my-source.jar
    plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
    plugins.d/custom-source-2/
    plugins.d/custom-source-2/lib/custom.jar
    plugins.d/custom-source-2/native/gettext.so
    

    数据的采集

    Flume支持很多种从外部source中采集数据的方式。

    RPC

    在flume的发布版本中包含了一个Avro的客户端,它能通过avro RPC方式发送一个指定的文件到Flume Avro source中:

    bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
    

    上面的命令将会发送文件/usr/logs/log.10的内容到监听在端口上的Flume source。

    执行命令

    有一个exec source,它可以执行一个给定的命令,然后消费这个输出,输出是个单行文本,文本以\r或者\n或者\r\n结尾。

    注:Flume不支持tail命令的source,有一个可以使用exec source执行的tail命令,同样可以流式的输出文件。

    网络流

    Flume支持以下几种方式从常见的日志流中读取数据:

    1. Avro
    2. Thrift
    3. Syslog
    4. Netcat

    配置多个agent的数据流

    图片来自官网

    为了能使数据流跨越多个agent或者跳,前一个agent的sink和当前一跳的source需要同样是avro类型的,并且sink需要指定source的hostname(或者ip地址)和端口号。

    数据流合并

    在做日志收集的时候一个常见的场景就是,大量的生产日志的客户端发送数据到少量的附属于存储子系统的消费者agent。例如,从数百个web服务器中收集日志,它们发送数据到十几个负责将数据写入HDFS集群的agent。

    图片来自官网

    这个可在Flume中可以实现,需要配置大量第一层的agent,每一个agent都有一个avro sink,让它们都指向同一个agent的avro source(强调一下,在这样一个场景下你也可以使用thrift source/sink/client)。在第二层agent上的source将收到的event合并到一个channel中,event被一个sink消费到它的最终的目的地。

    数据流复用

    Flume支持多路输出event流到一个或多个目的地。这是靠定义一个多路数据流实现的,它可以实现复制和选择性路由一个event到一个或者多个channel。

    图片来自官网
    上面的例子展示了agent foo中source扇出数据流到三个不同的channel,这个扇出可以是复制或者多路输出。在复制数据流的情况下,每一个event被发送所有的三个channel;在多路输出的情况下,一个event被发送到一部分可用的channel中,它们是根据event的属性和预先配置的值选择channel的,例如一个event的txnType属性的值是customer,这个它应该被发送到channel1和channel3,如果值是vendor,它应该被发送到channel2,如果没有到达channel2则会被发送到channel3. 这些映射关系应该被填写在agent的配置文件中。

    配置

    在上面几部分中都有提到,Flume agent的配置是从一个文件中读取的,该文件是一个具有层次结构的java properties文件。

    数据流的定义

    为了定义单个agent内部的数据流,你需要通过channel将source和sink连接起来。你需要列出给定agent的source,sink和channel,然后将source和sink指向一个channel。一个source实例可以指定多个channel,但是一个sink只能指定一个channel。相关配置格式如下:

    # list the sources, sinks and channels for the agent
    <Agent>.sources = <Source>
    <Agent>.sinks = <Sink>
    <Agent>.channels = <Channel1> <Channel2>
    
    # set channel for source
    <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
    
    # set channel for sink
    <Agent>.sinks.<Sink>.channel = <Channel1>
    

    例如,一个名为agent_foo,从一个外部的avro客户端读取数据,然后通过一个内存channel将数据发送到HDFS系统中。配置文件weblog.config看起来是下面这样的:

    # list the sources, sinks and channels for the agent
    agent_foo.sources = avro-appserver-src-1
    agent_foo.sinks = hdfs-sink-1
    agent_foo.channels = mem-channel-1
    
    # set channel for source
    agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
    
    # set channel for sink
    agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
    

    这样的配置会使event通过 mem-channel-1 从source avro-appserver-src-1 中流向sink hdfs-sink-1。当一个agent使用weblog.config作为它的配置文件启动时,它将实例化那个数据流。

    单个组件的配置

    定义过了数据流之后,你需要设置每一个组件(source、sink、channel)的属性。在properties文件中为每个组件的type指定值,都在同一个有层次的命名空间下完成的:

    # properties for sources
    <Agent>.sources.<Source>.<someProperty> = <someValue>
    
    # properties for channels
    <Agent>.channel.<Channel>.<someProperty> = <someValue>
    
    # properties for sinks
    <Agent>.sources.<Sink>.<someProperty> = <someValue>
    

    每一个组件都需要设置type属性,这样Flume才能理解需要什么类型的对象。每个source、sink和channel都有属于自己的一组属性,来实现它们预期的功能。所有这些属性都是按需设置的,在前一个例子中,我们有一个数据流是通过内存channel mem-channel-1avro-AppSrv-sourcehdfs-Cluster1-sink,下面是一个展示其中每一个组件的配置的例子:

    agent_foo.sources = avro-AppSrv-source
    agent_foo.sinks = hdfs-Cluster1-sink
    agent_foo.channels = mem-channel-1
    
    # set channel for sources, sinks
    
    # properties of avro-AppSrv-source
    agent_foo.sources.avro-AppSrv-source.type = avro
    agent_foo.sources.avro-AppSrv-source.bind = localhost
    agent_foo.sources.avro-AppSrv-source.port = 10000
    
    # properties of mem-channel-1
    agent_foo.channels.mem-channel-1.type = memory
    agent_foo.channels.mem-channel-1.capacity = 1000
    agent_foo.channels.mem-channel-1.transactionCapacity = 100
    
    # properties of hdfs-Cluster1-sink
    agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
    agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
    
    #...
    

    在一个agent中配置多个数据流

    在一个单一的Flume agent中可以包含多个独立的数据流,在一个配置文件中你可以列出多个source、sink 和 channel。这些组件会被连接起来形成多个数据流:

    # list the sources, sinks and channels for the agent
    <Agent>.sources = <Source1> <Source2>
    <Agent>.sinks = <Sink1> <Sink2>
    <Agent>.channels = <Channel1> <Channel2>
    

    然后你可以利用相应的channel将对应的 source 和 sink 连接起来形成两个不同的数据流。例如,如果你需要在一个agent中配置两个数据流,一个来自外部的avro客户端流向外部的HDFS,另一个来自外部的 tail 流向avro sink,需要下面这样的配置:

    # list the sources, sinks and channels in the agent
    agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
    agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
    agent_foo.channels = mem-channel-1 file-channel-2
    
    # flow #1 configuration
    agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
    agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
    
    # flow #2 configuration
    agent_foo.sources.exec-tail-source2.channels = file-channel-2
    agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
    

    多agent数据流的配置

    为了配置一个多层数据流,你需要将第一跳的 avro/thrift sink指向下一跳的 avro/thrift source。这样第一个Flume agent就会将event传送到下一个flume agent中。例如,如果你使用avro 客户端周期性地传送一个文件(每个event一个文件)到本地的flume agent中,然后这个本地agent将它传送到下一个agent,然后把它存储起来。

    Weblog agent 配置信息:

    # list sources, sinks and channels in the agent
    agent_foo.sources = avro-AppSrv-source
    agent_foo.sinks = avro-forward-sink
    agent_foo.channels = file-channel
    
    # define the flow
    agent_foo.sources.avro-AppSrv-source.channels = file-channel
    agent_foo.sinks.avro-forward-sink.channel = file-channel
    
    # avro sink properties
    agent_foo.sources.avro-forward-sink.type = avro
    agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
    agent_foo.sources.avro-forward-sink.port = 10000
    
    # configure other pieces
    #...
    

    HDFS agent 配置信息:

    # list sources, sinks and channels in the agent
    agent_foo.sources = avro-collection-source
    agent_foo.sinks = hdfs-sink
    agent_foo.channels = mem-channel
    
    # define the flow
    agent_foo.sources.avro-collection-source.channels = mem-channel
    agent_foo.sinks.hdfs-sink.channel = mem-channel
    
    # avro sink properties
    agent_foo.sources.avro-collection-source.type = avro
    agent_foo.sources.avro-collection-source.bind = 10.1.1.100
    agent_foo.sources.avro-collection-source.port = 10000
    
    # configure other pieces
    #...
    

    这样我们就将 weblog agent 的 avro-forward-sink 和 hdfs agent 的 avro-collection-source 连接起来了。最后,来自外部 appserver source 的 event 就存储在了HDFS中。

    扇出数据流

    在前面部分我们提到,Flume支持从一个source中扇出数据流到多个 channel 中,Flume有两种扇出模式,复制和多路复用。在复制数据流中,event 被发送到所有配置的channel中,在多路复用数据流中,event只被发送到特定的channel(所有channel的一个子集)中。为了扇出数据流,需要为一个source指定一个channel列表和扇出策略,扇出策略是通过添加channel 的 Selector 完成的,selector 的类型可能是复制也可能是多路复用,如果是多路复用,需要进一步配置选择规则,如果你不指定一个 Selector 默认就是复制:

    # List the sources, sinks and channels for the agent
    <Agent>.sources = <Source1>
    <Agent>.sinks = <Sink1> <Sink2>
    <Agent>.channels = <Channel1> <Channel2>
    
    # set list of channels for source (separated by space)
    <Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
    
    # set channel for sinks
    <Agent>.sinks.<Sink1>.channel = <Channel1>
    <Agent>.sinks.<Sink2>.channel = <Channel2>
    
    <Agent>.sources.<Source1>.selector.type = replicating
    

    多路选择器为了使数据分流需要进一步的设置,这需要指定一个event的mapping属性,使event流向指定的channel。selector会检查在event header中配置的每个属性,如果它匹配到了特定的值,event会被发送到那个值映射到的所有channel中,如果没有匹配的,event会被发送到若干默认配置的channel:

    # Mapping for multiplexing selector
    <Agent>.sources.<Source1>.selector.type = multiplexing
    <Agent>.sources.<Source1>.selector.header = <someHeader>
    <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
    <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
    <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
    #...
    
    <Agent>.sources.<Source1>.selector.default = <Channel2>
    

    每个值的mapping允许channel重叠。
    下面有一个单个数据流通过多路选择流向两个路径例子,名叫agent_foo的agent有一个avro source 两个 channel 连接着两个 sink:

    # list the sources, sinks and channels in the agent
    agent_foo.sources = avro-AppSrv-source1
    agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
    agent_foo.channels = mem-channel-1 file-channel-2
    
    # set channels for source
    agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
    
    # set channel for sinks
    agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
    agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
    
    # channel selector configuration
    agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
    agent_foo.sources.avro-AppSrv-source1.selector.header = State
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
    

    通过上面的配置,Selector 会检查event中一个名叫 State 的 header,如果它的值是CA,这样的event会被发送到 channel mem-channel-1 中,如果它的值是 AZ ,这样的event会被发送到 channel file-channel-2 中,如果它的值是 NY,这样的event则会被发送到两个channel中,如果State header没有设值或者没有匹配到三个中的任何一个,这样的event会被发送到 default 设置的 mem-channel-1 中。

    Selector 还支持可选择的 channel,为了设置可选择的 channel 使用了一个 header,在下面的参数中配置参数optional 被使用:

    # channel selector configuration
    agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
    agent_foo.sources.avro-AppSrv-source1.selector.header = State
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
    

    Selector 首先尝试写入第一个必需的channels,如果这些消费event的channel中有一个失败,则整个事务失败,事务会再次尝试所有的这些channel,如果所有的必需的channel都消费到了event,然后selector会尝试写入可选的channel中,如果这些可选的channel中的任何一个消费event失败,都会被简单的忽略,而不会发起再次尝试。

    如果在一个指定的header的可选的channel和必需的channel有重叠,这个channel 会被认为是必须的,这个channel的失败同样会导致所有必需channel的重试。举个例子,在上面的配置样例中,header值等于 CA 的 channnel mem-channel-1 是被认为是必需的 channel,尽管它在必需的和可选的中都有标记,一旦写入这个channel失败,将会导致失败的event重试 Selector 中配置的所有的channel。

    注意,如果一个header 没有任何的必需的channel,然后这个event会被写入到默认的channel中,并且尝试写入到这个header的可选channel中。如果在没有指定必需的channel的情况下,设置可选的channel会导致event被写入默认的channel 中 。如果没有channel被指定为默认的,也没有必需的channel,Selector 会尝试将event写入到可选的channel中,在这种情况下,任何的失败都会被忽略。

    Flume Sources

    Avro Source

    监听在Avro端口上接收来自外部的Avro客户端的数据流,当它与内嵌在另一个Flume agent (上一跳)的Avro Sink 成对出现时,它可以创建出分层的信息收集拓扑结构,下表中加粗的是必需的属性。

    参数名 默认值 描述
    channels -
    type - 组件的类型名,值必须是 avro
    bind - 监听的主机名或者IP地址
    port - bind 的端口号
    threads - 能产生的最大线程数
    selector.type
    selector.*
    interceptors - 以空格分隔的interceptor列表
    interceptors.*
    compression-type none 它的值可以是“none”或者“deflate”,但是必须和AvroSource匹配。
    ssl false 设其值为true时,启用SSL加密,同时还必须指定一个 “keystore” 和 “keystore-password”。
    keystore - 它的值是 Java keystore 的路径,SSL必需的。
    keystore-password - 为 Java keystore 设置密码,SSL必需的。
    keystore-type JKS Java keystore 的类型,它的值可以是 “JKS” 或者 “PKCS12”。
    exclude-protocols SSLv3 由空格分隔的要排除的SSL/TLS 协议列表,除了指定的协议之外,SSLv3总是被排除在外。
    ipFilter false 其值设置为true时,启用netty的ipFiltering。
    ipFilterRules - 使用此配置定义N netty ipFilter的模式规则

    一个名叫 a1 的 agent 的例子:

    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 4141
    

    ipFilterRules 的例子:
    ipFilterRules 定义的 N netty ipFilters 的规则必须是逗号隔开的,并且必须是下面的格式:
    <’allow’ or deny>:<’ip’ or ‘name’ 机器名>:<pattern> 或者 allow/deny:ip/name:pattern
    例子:
    ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
    注,匹配到的第一条规则适用于下面的例子,在本地客户端上展示。
    配置“allow:name:localhost,deny:ip:,这将允许来自本地的客户端,拒绝来自其它任何ip的客户端,配置deny:name:localhost,allow:ip:,这将拒绝来自本地的客户端允许其它任何ip的客户端。

    Thrift Source

    监听 Thrift 端口,接收外部 Thrift 客户端流进来的 event。当它与内嵌有Thrift Sink的另一个(上一跳)Flume agent成对出现时,它能创建分层的数据收集拓扑结构。Thrift Source可以通过配置kerberos 认证实现在安全模式下启动,agent-principal 和 agent-keytab 是 Thrift Source 向 kerberos KDC 授权的两个属性。粗体是必须的属性。

    参数名 默认值 描述
    channels -
    type - 组件的类型名,值必须是 thrift
    bind - 监听的主机名或者IP地址
    port - bind 的端口号
    threads - 能产生的最大线程数
    selector.type
    selector.*
    interceptors - 以空格分隔的interceptor列表
    interceptors.*
    ssl false 设其值为true时,启用SSL加密,同时还必须指定一个 “keystore” 和 “keystore-password”。
    keystore - 它的值是 Java keystore 的路径,SSL必需的。
    keystore-password - 为 Java keystore 设置密码,SSL必需的。
    keystore-type JKS Java keystore 的类型,它的值可以是 “JKS” 或者 “PKCS12”。
    exclude-protocols SSLv3 由空格分隔的要排除的SSL/TLS 协议列表,除了指定的协议之外,SSLv3总是被排除在外。
    kerberos false 值为true时,启用kerberos认证,在kerberos模式下,agent-principal 和 agent-keytab是成功认证通过必需的属性,Thrift source在安全模式下,仅接受来自启用kerberos并且成功通过kerberos KDC认证的thrift客户端的连接。
    agent-principal - 它被用于 thrift source 向 kerberos KDC 授权时使用。
    agent-keytab - keytab的位置和agent-principal结合使用。

    一个名为a1的agent的例子:

    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.type = thrift
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 4141
    

    Exec Source

    Exec Source 可以运行一个unix命令,这个命令是启动命令或者是一个持续在标准输出上生产数据的进程(stderr会被简单的抛弃,除非配置logStdErr这个属性的值为true)。如果这个进程不管任何原因退出,这个resource也会退出,并且也不会产生进一步的数据。这意味着配置例如cat[namedpipe]或者tail-F[file] 这样的命令来产生我们希望的结果,而命令 date 不一样,前面两个命令将产生一个数据流,而后面的一个命令只会产生一个event而退出。 粗体是必需的参数。

    参数名 默认值 描述
    channels -
    type - 组件的类型名,值必须是 exec
    command - 要执行的命令
    shell - 用于执行command的shell外壳,例如 /bin/sh -c,仅在使用的命令依赖某种shell类型时,这个属性是必需的,比如通配符、返回标记、管道符等。
    restartThrottle 10000 在尝试重新启动之前等待的总时长(单位毫秒)
    restart false 如果进程死掉了是否执行命令重新启动
    logStdErr false 命令的标准错误输出是否应该被记录
    batchSize 20 同时读取和发送到channel的最大行数
    batchTimeout 3000 如果数据在被推到下游之前,缓冲区的大小未达到,要等待的时长(单位毫秒)
    selector.type replicating replicating(复制) 或者 multiplexing(多路)
    selector.*
    interceptors - 以空格分隔的interceptor列表
    interceptors.*

    警告:ExecSource和其它的异步的source都有的一个问题是,如果有一个event放入channel失败了,source不可能保证客户端会知道。在这种情况下会出现数据丢失。作为一个实例,我们最常需要的一个功能是 tail-F[file] ,使用场景是当一个应用在硬盘上写日志文件的时候,Flume tail 那个文件,然后将每一行当作一个event发送出去。有一个很可能出现的很明显的问题是,如果channel被填满了,Flume不能发送event了会发生什么事情?Flume没有办法知道应用是正在写日志文件需要它保存日志呢,还是由于某种原因event没有被发送呢。如果这是没有意义的,你仅需要知道它:当你使用一个单向的、异步的接口(如ExecSource)时,你的应用可能从来都不能保证数据已经被发送出去了!作为这个警告的延伸,你要完全清楚一件事,使用这个source,event的发送完全是零保证的。为了有更强的可靠性保证,可以考虑使用 Spooling Directory Source 或者使用flume的SDK直接集成到应用中去。

    and

    注意:你可以模仿Flume 0.9x(flume og)的 TailSource 来使用 ExecSource。它只能使用unix命令 tail-F/full/path/to/your/file。在这种情况下,参数 -F-f 更好,因为它支持文件的轮替。

    一个名为a1的agent的例子:

    a1.sources = r1
    a1.channels = c1
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /var/log/secure
    a1.sources.r1.channels = c1
    

    shell 参数被用于配置执行 command 的一个shell(如Bash 或 Powershell), command 的值会被作为一个参数传给 shell 执行,这就允许 command 使用一些该 shell 的特性,例如通配符、返回标记、管道、循环、条件等。如果缺少 shell 的配置,command 将会被直接调用。shell 常见的值包括/bin/sh -c, /bin/ksh -c, cmd /c, powershell -Command 等。

    a1.sources.tailsource-1.type = exec
    a1.sources.tailsource-1.shell = /bin/bash -c
    a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
    

    JMS Source

    JMS Source 从 JMS 的目的地(如队列、主题)中读取消息。作为一个 JMS 应用应该能够和任何的 JMS 提供者一起工作,但是目前仅测试过 ActiveMQ 。JMS Source 可配置 batch size, message selector, 用户名/密码, 和从 message 到 flume event 的转码器。注意供应商提供的 JMS jar 包应该被包含在Flume的classpath中,首选放在plugins.d文件夹下,或者在命令行中使用 –classpath 参数,再或者通过在文件 flume-env.sh 中定义的 FLUME_CLASSPATH 设置。
    粗体是必需的参数。


    每天持续更新中...

    相关文章

      网友评论

      • geverway:看英文文档有些疑惑,再看你的译文,觉得豁然开朗。
        发现了文章一处配置错误,关于多 agent 数据流的:

        # avro sink properties
        agent_foo.sources.avro-forward-sink.type = avro
        agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
        agent_foo.sources.avro-forward-sink.port = 10000

        应该是:
        agent_foo.sinks.avro-forward-sink.type = avro
        agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
        agent_foo.sinks.avro-forward-sink.port = 10000
        rabbitGYK:@geverway 有了一定基础概念之后,建议还是看官方英文文档
        rabbitGYK:@geverway 谢谢指正
      • 掂吾掂:想请教一下作者...我最近项目里要用到spark streaming来处理实时的日志文件,因为项目不是很复杂,所以我想用flume实时获取日志文件,然后传给spark streaming去处理...但是还没有思路...
        rabbitGYK:@掂吾掂 flume是日志收集的agent即客户端,redis做消息队列是缓冲,方便spark 消费
        掂吾掂:@rabbitGYK 那就是说,不用flume?直接用redis做消息队列发给spark streaming?
        rabbitGYK:@掂吾掂 使用redis做消息队列

      本文标题:Apache Flume 1.6.0 用户指南

      本文链接:https://www.haomeiwen.com/subject/okbedttx.html