前言
Sink运行器(Sink Runner)运行一个Sink组(Sink Group),Sink组可以含有一个或多个Sink。如果组中只存在一个Sink,那么没有组将会更有效率。Sink运行器仅仅是一个询问Sink组(或Sink)来处理下一批事件的线程。每个Sink组有一个Sink处理器(Sink Processor),处理器选择组中的Sink之一去处理下一个事件集合。每个Sink只能从一个Channel获取数据(一个Sink只能有一个Channel),尽管多个Sink可以从同一个Channel获取数据。选定的Sink(或如果没有组,唯一的Sink)从Channel中接受事件,并将事件写入到下一阶段或最终目的地。
Sink组
Flume配置框架为每个Sink组实例化一个Sink运行器,来运行Sink组。每个Sink组可以包含任意数量的Sink。Sink运行器持续请求Sink组,要求其中的一个Sink从自己的Channel中读取事件。Sink组通常用于RPC Sink,在层之间以负载均衡或故障转移方式发送数据。
Sink组中的每个Sink必须单独进行配置。这包括:Sink从哪个Channel读取,写数据到哪些主机或者集群。
在理想情况下,如果Sink组中建立了几个Sink,所有的Sink将从相同的Channel读取,这将有利于在当前层以合理的速度清除数据,确保将要被发送到多台集群的数据,以一种支持负载均衡和故障转移的方式进行发送。
Sink处理器
Sink处理器决定任何时候哪个Sink是活跃的组件。
Sink处理器与Sink运行器不同。sink运行器实际上是运行sink的,而sink处理器决定了哪个sink应该从自己的channel中拉取事件。
Flume自带了两类Sink处理器:load-balancing Sink处理器和failover Sink处理器
Load-Balancing Sink处理器
Load-Balancing Sink 处理器从所有的Sink中选择一个Sink,处理来自Channel的事件。
意义所在:
假设第一层100个agent,第二层有4个agent。第一层每个agent将有4个avro sink用来推送数据到第二层的每个agent。该工作正常运行,直到其中第二层的一个agent失败。此时,配置发送数据的sink 将不会发送任何数据,直到第二层失败的agent重新上线。
这种情况下,sink耗尽了agent上的几个线程,浪费了CPU周期,直到第二层agent启动并运行,通过创建事件的事务且回滚。该sink也会给channel造成额外的压力。
为了避免这样的问题,sink组使用load-balancing sink处理器是一个好主意。它将从sink组所有的sink中选择一个sink,处理来自channel的事件。
如果Sink写入到一个失败的Agent或者速度太慢的Agent,会导致超时,Sink处理器会选择另一个Sink写数据。
Sink处理器可以配置将失败的Sink加入黑名单,回退时间以指数方式增长直到达到上限值。这能确保相同的Sink不会循环重复尝试且不浪费资源,直到回退时间过期。
参数 | 描述 |
---|---|
processor.sinks | 空格分割的sink列表 |
processor.type | load_balance / failover |
processor.selector | 默认round_robin,有round_robin和random两种方式 |
processor.backoff | 默认false,失败的sink是否启用回退 |
processor.selector.maxTimeOut | 默认30000,该时间之后黑名单时间周期不再增长 |
Sink选择的顺序可以为random或者round-robin。如果顺序被设置为random,那么将随机从Sink组的Sink中选择一个,用来从自己的Channel中移除事件并将它们写出。round-robin选项使Sink以循环的方式被选择:每个选择循环调用定义Sink组中指定顺序Sink的process方法。
配置示例
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
该配置意味着,在任何时候每个agent只有一个sink写数据。可以通过添加多个有相似配置的load-balancing sink处理器的sink组进行修改。
Failover Sink处理器
Failover Sink处理器从Sink组中以优先级的顺序选择Sink。拥有最高优先级的Sink先写数据直到它失败,然后选择组中其他Sink中拥有最高优先级的Sink。这能确保当没有失败时,每台机器上只有一个Sink写入到第二层的所有Agent。
同时,这意味着,即使已经失败的最高优先级的sink恢复了,Failover Sink处理器也不会让写入该Sink激活,直到目前活跃的sink遇到一个错误。
参数 | 描述 |
---|---|
processor.sinks | 空格分割的sink列表 |
processor.type | load_balance / failover |
processor.priority.<sinkName> | sink的优先级 |
processor.maxpenalty | 默认30000,失败Sink的最大回退时间 |
配置示例
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Load-Balancing Sink和Failover Sink处理器的区别
案例从上面的例子中我们对比他们的区别,Load-Balancing Sink处理器是将左边4个Sink作为一组,这一组的每个sink的目的地都是不一样的,每次根据processor.selector值配置的方式去选择哪个sink去发送事件。
Failover Sink处理器还是将多个sink放到一个sink组中,每次只有一个sink堆外发送数据,优先选择sink的优先级最大的去发送,然后往死里用,直到用挂了,再抬走换下一个。因此,Failover Sink没有负载平衡处理。只是做到容灾。
网友评论