美文网首页
Flume Sink处理器

Flume Sink处理器

作者: 吃货大米饭 | 来源:发表于2019-09-29 09:12 被阅读0次

介绍

Sink运行器(Sink Runner)运行一个Sink组(Sink Group),Sink组可以含有一个或多个Sink。如果组中只存在一个Sink,那么没有组将会更有效率。Sink运行器仅仅是一个询问Sink组(或Sink)来处理下一批事件的线程。每个Sink组有一个Sink处理器(Sink Processor),处理器选择组中的Sink之一去处理下一个事件集合。每个Sink只能从一个Channel获取数据(一个Sink只能有一个Channel),尽管多个Sink可以从同一个Channel获取数据。选定的Sink(或如果没有组,唯一的Sink)从Channel中接受事件,并将事件写入到下一阶段或最终目的地。

Sink处理器

Sink处理器决定任何时候哪个Sink是活跃的组件。

Sink处理器与Sink运行器不同。sink运行器实际上是运行sink的,而sink处理器决定了哪个sink应该从自己的channel中拉取事件。
Flume自带了两类Sink处理器:load-balancing Sink处理器和failover Sink处理器

Load-Balancing Sink处理器

Load-Balancing Sink 处理器从所有的Sink中选择一个Sink,处理来自Channel的事件。
意义所在:
假设第一层100个agent,第二层有4个agent。第一层每个agent将有4个avro sink用来推送数据到第二层的每个agent。该工作正常运行,直到其中第二层的一个agent失败。此时,配置发送数据的sink 将不会发送任何数据,直到第二层失败的agent重新上线。
这种情况下,sink耗尽了agent上的几个线程,浪费了CPU周期,直到第二层agent启动并运行,通过创建事件的事务且回滚。该sink也会给channel造成额外的压力。

为了避免这样的问题,sink组使用load-balancing sink处理器是一个好主意。它将从sink组所有的sink中选择一个sink,处理来自channel的事件。

如果Sink写入到一个失败的Agent或者速度太慢的Agent,会导致超时,Sink处理器会选择另一个Sink写数据。

Sink处理器可以配置将失败的Sink加入黑名单,降级时间以指数方式增长直到达到上限值。这能确保相同的Sink不会循环重复尝试且不浪费资源,直到降级时间过期。

参数 默认值 描述
sinks - 空格分割的sink列表
processor.type default 组件类型名称 load_balance/ failover
processor.backoff false 失效降级
processor.selector round_robin 有round_robin和random两种方式
processor.selector.maxTimeOut 30000 降级时间30秒

Sink选择的顺序可以为random或者round-robin。如果顺序被设置为random,那么将随机从Sink组的Sink中选择一个(每次都会),用来从自己的Channel中移除事件并将它们写出。round-robin选项使Sink以循环的方式被选择:每个选择循环调用定义Sink组中指定顺序Sink的process方法。

配置示例

  • nc-memory-avro.conf
#define source
nc-avro-agent.sources.nc-source.type=netcat
nc-avro-agent.sources.nc-source.bind=0.0.0.0
nc-avro-agent.sources.nc-source.port=44444

#define channel
nc-avro-agent.channels.nc-memory-channel.type=memory

#define sink
nc-avro-agent.sinks.avro-sink-1.type=avro
nc-avro-agent.sinks.avro-sink-1.hostname=hadoop000
nc-avro-agent.sinks.avro-sink-1.port=44445

nc-avro-agent.sinks.avro-sink-2.type=avro
nc-avro-agent.sinks.avro-sink-2.hostname=hadoop000
nc-avro-agent.sinks.avro-sink-2.port=44446

#define sink processor
nc-avro-agent.sinkgroups=avro-sink-group
nc-avro-agent.sinkgroups.avro-sink-group.sinks=avro-sink-1 avro-sink-2
nc-avro-agent.sinkgroups.avro-sink-group.processor.type=load_balance
nc-avro-agent.sinkgroups.avro-sink-group.processor.backoff=true
nc-avro-agent.sinkgroups.avro-sink-group.processor.selector=random


#bind source and sink to channel
nc-avro-agent.sources.nc-source.channels=nc-memory-channel
nc-avro-agent.sinks.avro-sink-1.channel=nc-memory-channel
nc-avro-agent.sinks.avro-sink-2.channel=nc-memory-channel

该配置意味着,在任何时候每个agent只有一个sink写数据。可以通过添加多个有相似配置的load-balancing sink处理器的sink组进行修改。

Failover Sink处理器

Failover Sink处理器从Sink组中以优先级的顺序选择Sink。拥有最高优先级的Sink先写数据直到它失败,然后选择组中其他Sink中拥有最高优先级的Sink。这能确保当没有失败时,每台机器上只有一个Sink写入到第二层的所有Agent。

同时,这意味着,即使已经失败的最高优先级的sink恢复了,Failover Sink处理器也不会让写入该Sink激活,直到目前活跃的sink遇到一个错误。

参数 默认值 描述
sinks - 空格分割的sink列表
processor.type default 组件类型名称 load_balance/ failover
processor.priority.<sinkName> - sink的优先级,越大越高
processor.maxpenalty 30000 降级时间30秒
failover sink 处理器工作流程
配置示例
#define source
nc-avro-agent.sources.nc-source.type=netcat
nc-avro-agent.sources.nc-source.bind=0.0.0.0
nc-avro-agent.sources.nc-source.port=44444

#define channel
nc-avro-agent.channels.nc-memory-channel.type=memory

#define sink
nc-avro-agent.sinks.avro-sink-1.type=avro
nc-avro-agent.sinks.avro-sink-1.hostname=hadoop000
nc-avro-agent.sinks.avro-sink-1.port=44445

nc-avro-agent.sinks.avro-sink-2.type=avro
nc-avro-agent.sinks.avro-sink-2.hostname=hadoop000
nc-avro-agent.sinks.avro-sink-2.port=44446

#define sink processor
nc-avro-agent.sinkgroups=avro-sink-group
nc-avro-agent.sinkgroups.avro-sink-group.sinks=avro-sink-1 avro-sink-2
nc-avro-agent.sinkgroups.avro-sink-group.processor.type=failover
nc-avro-agent.sinkgroups.avro-sink-group.processor.priority.avro-sink-1=5
nc-avro-agent.sinkgroups.avro-sink-group.processor.priority.avro-sink-2=10
nc-avro-agent.sinkgroups.avro-sink-group.processor.maxpenalty=10000


#bind source and sink to channel
nc-avro-agent.sources.nc-source.channels=nc-memory-channel
nc-avro-agent.sinks.avro-sink-1.channel=nc-memory-channel
nc-avro-agent.sinks.avro-sink-2.channel=nc-memory-channel

Load-Balancing Sink和Failover Sink处理器的区别

案例

从上面的例子中我们对比他们的区别,Load-Balancing Sink处理器是将左边4个Sink作为一组,这一组的每个sink的目的地都是不一样的,每次根据processor.selector值配置的方式去选择哪个sink去发送事件。

Failover Sink处理器还是将多个sink放到一个sink组中,每次只有一个sink对外发送数据,优先选择sink的优先级最大的去发送,然后往死里用,直到用挂了,再抬走换下一个。因此,Failover Sink没有负载平衡处理。只是做到容灾。

相关文章

网友评论

      本文标题:Flume Sink处理器

      本文链接:https://www.haomeiwen.com/subject/rdoiyctx.html