image.png
- Source:用于接收数据(文本、数据库、hdfs都可以充当数据源),若数据比较特殊,那么可以进行自定义。
- ChannelProcessor:在Source中调用ChannelProcessor,用于获取Channel。
- 会将该事件交给拦截器链(Interceptor),经过一系列拦截器链处理完成之后,又会返回给ChannelProcessor。
- ChannelProcessor会将处理后的事件交由Channel选择器(ChannelSelector)。
source可以绑定多个Channel,那么数据来了交由哪个Channel呢?这是就用到了ChannelSelector,根据channel选择器的不同,处理也会不同。
ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是
Replicating(复制
)和Multiplexing(多路复用)
。
ReplicatingSelector
会将同一个Event发往所有的Channel,
Multiplexing
会根据相应的原则,将不同的Event发往不同的Channel。
- 事件经过ChannelSelector之后,会给事件打上“标记”,告诉ChannelProcessor是Replicating 选择器还是 Multiplexing。
- 根据Channel选择器的选择结果,将事件写入相应的Channel。如果是Replicating选择器,那么所有的Channel都会分发。如果是Multiplexing选择器会分发给对应的选择器。
直到这里才开始put事务
,毕竟这里才真正开始往channel中写入数据,doput,doCommit等操作。 - 接下里开始对sink打交道。SinkProcessor有三种:DefaultSinkProcessor、LoadBalancingSinkProcessor、FailoverSinkProcessor。sink并不是向channel那样成为选择器,而是称为组(SinkGroup)。
DefaultSinkProcessor:只接收一个sink,虽然也成为sink组,但是并没有组的概念。
LoadBalancingSinkProcessor:负载均衡,防止单个sink压力过大。
FailoverSinkProcessor:故障转移,防止某个sink宕机造成数据丢失。
ChannelSelector(Channel选择器)
官方文档flume-channel-selectors
Replicating Channel Selector (default): 复制选择器
- 默认的Channel选择器
- 如何配置?
a1.sources = r1
# 定义
a1.channels = c1 c2 c3
# 这个可以不用写,毕竟默认的就是他
a1.sources.r1.selector.type = replicating
# 绑定
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
Multiplexing Channel Selector:多路复用
配置
a1.sources = r1
# 定义 channel
a1.channels = c1 c2 c3 c4
#这里就必须指定了
a1.sources.r1.selector.type = multiplexing
# 这是给Event中的header(默认是空的)中添加一个state的key
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
state会作为Event 中 header的一个key,CZ、US 会作为 state的value。如果给定的是 CZ,那么这个选择器会给c1,如果value是US,那么选择器会给 c2和c3。若 CZ 和US 都没选择上那么会给 c4。
这里需要结合拦截器来用
。
Custom Channel Selector:自定义选择器
除了以上两种外,可以使用 *Custom Channel Selector,自定义选择器。还没用过,先了解一下
SinkProcessor
官方文档flume-sink-processors
Default Sink Processor:
默认的 sink组,只能接收一个sink。可以不用写sink测试。
Failover Sink Processor:故障转移,防止其他sink宕机造成数据丢失
配置:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Load balancing Sink Processor:负载均衡
- 配置:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
- a1.sinkgroups.g1.processor.backoff
这是一种退避算法,需要结合 processor.selector.maxTimeOut(默认为30秒)来使用。 - a1.sinkgroups.g1.processor.selector:
若使用 Load balancing Sink Processor 来实现负载均衡,那么负载均衡的策略可使用processor.selector定义。包括 轮询、随机、自定义,默认为轮询。
processor.selector round_robin Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
什么是退避算法
image.png 第一次:以轮询的方式将数据发送给sink1,2,3。
第二次:准备发送数据时,发现sink 挂掉了,此时就不会将sink1发送数据了,而是轮询给其他服务器(sink2,sink3) ,并且在processor.selector.maxTimeOut
时间范围内,不会在访问sink1。
第三次:若已经超过了processor.selector.maxTimeOut
时间范围,便会再访问一次 sink1。若sink1依旧访问不了,那么会将原来的processor.selector.maxTimeOut
时间翻一倍。如1,2,4,8...
实现方式参考:https://www.iteye.com/blog/qiuqiang1985-1513049
网友评论