美文网首页
Flume(四)Flume Agent内部原理

Flume(四)Flume Agent内部原理

作者: 万事万物 | 来源:发表于2021-06-07 06:28 被阅读0次
image.png
  1. Source:用于接收数据(文本、数据库、hdfs都可以充当数据源),若数据比较特殊,那么可以进行自定义。
  2. ChannelProcessor:在Source中调用ChannelProcessor,用于获取Channel。
  3. 会将该事件交给拦截器链(Interceptor),经过一系列拦截器链处理完成之后,又会返回给ChannelProcessor。
  4. ChannelProcessor会将处理后的事件交由Channel选择器(ChannelSelector)。
    source可以绑定多个Channel,那么数据来了交由哪个Channel呢?这是就用到了ChannelSelector,根据channel选择器的不同,处理也会不同。

ChannelSelector

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)
ReplicatingSelector会将同一个Event发往所有的Channel,
Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

  1. 事件经过ChannelSelector之后,会给事件打上“标记”,告诉ChannelProcessor是Replicating 选择器还是 Multiplexing。
  2. 根据Channel选择器的选择结果,将事件写入相应的Channel。如果是Replicating选择器,那么所有的Channel都会分发。如果是Multiplexing选择器会分发给对应的选择器。直到这里才开始put事务,毕竟这里才真正开始往channel中写入数据,doput,doCommit等操作。
  3. 接下里开始对sink打交道。SinkProcessor有三种:DefaultSinkProcessor、LoadBalancingSinkProcessor、FailoverSinkProcessor。sink并不是向channel那样成为选择器,而是称为组(SinkGroup)。

DefaultSinkProcessor:只接收一个sink,虽然也成为sink组,但是并没有组的概念。
LoadBalancingSinkProcessor:负载均衡,防止单个sink压力过大。
FailoverSinkProcessor:故障转移,防止某个sink宕机造成数据丢失。


ChannelSelector(Channel选择器)

官方文档flume-channel-selectors
Replicating Channel Selector (default): 复制选择器

  1. 默认的Channel选择器
  2. 如何配置?
a1.sources = r1
# 定义
a1.channels = c1 c2 c3
# 这个可以不用写,毕竟默认的就是他
a1.sources.r1.selector.type = replicating
# 绑定
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

Multiplexing Channel Selector:多路复用
配置

a1.sources = r1
# 定义 channel
a1.channels = c1 c2 c3 c4
#这里就必须指定了
a1.sources.r1.selector.type = multiplexing
# 这是给Event中的header(默认是空的)中添加一个state的key
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

state会作为Event 中 header的一个key,CZ、US 会作为 state的value。如果给定的是 CZ,那么这个选择器会给c1,如果value是US,那么选择器会给 c2和c3。若 CZ 和US 都没选择上那么会给 c4。这里需要结合拦截器来用

Custom Channel Selector:自定义选择器
除了以上两种外,可以使用 *Custom Channel Selector,自定义选择器。还没用过,先了解一下


SinkProcessor

官方文档flume-sink-processors
Default Sink Processor:
默认的 sink组,只能接收一个sink。可以不用写sink测试。
Failover Sink Processor:故障转移,防止其他sink宕机造成数据丢失
配置:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor:负载均衡

  1. 配置:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
  1. a1.sinkgroups.g1.processor.backoff
    这是一种退避算法,需要结合 processor.selector.maxTimeOut(默认为30秒)来使用。
  2. a1.sinkgroups.g1.processor.selector:
    若使用 Load balancing Sink Processor 来实现负载均衡,那么负载均衡的策略可使用processor.selector定义。包括 轮询、随机、自定义,默认为轮询。
    processor.selector round_robin Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector

什么是退避算法

image.png 第一次:以轮询的方式将数据发送给sink1,2,3。
第二次:准备发送数据时,发现sink 挂掉了,此时就不会将sink1发送数据了,而是轮询给其他服务器(sink2,sink3) ,并且在processor.selector.maxTimeOut时间范围内,不会在访问sink1。
第三次:若已经超过了 processor.selector.maxTimeOut 时间范围,便会再访问一次 sink1。若sink1依旧访问不了,那么会将原来的processor.selector.maxTimeOut 时间翻一倍。如1,2,4,8...
实现方式参考:https://www.iteye.com/blog/qiuqiang1985-1513049

相关文章

网友评论

      本文标题:Flume(四)Flume Agent内部原理

      本文链接:https://www.haomeiwen.com/subject/ugxmsltx.html