前言
Channel选择器是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。
![](https://img.haomeiwen.com/i14534869/356ed6f8eaf9a41b.png)
由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。
Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。
Flume内置两种选择器,replicating 和 multiplexing,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器。
复制Channel选择器
如果Source没有指定Channel选择器,则该Source使用复制Channel选择器。该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。
复制Channel选择器还有一个配置参数optional,该参数指定的所有Channel都是可选的,当事件写入到这些Channel时又失败发生,则忽略这些失败。写入其他Channel的失败将导致Source抛出异常,并要求Source重试。
配置示例
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
如上所示,未能写入c3不会导致ChannelException异常抛出到Source,并且Source将通知前一阶段写入是成功的。
多路复用Channel选择器
多路复用Channel选择器是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由。可以结合拦截器使用。
多路复用Channel选择器寻找一个特定的报头,该报头通过通过选择器的配置指定,基于该报头的值,选择器返回事件写入Channel的一个子集,如果配置没有指定一个特定事件的报头,则该事件写入到默认Channel。
参数 | 描述 |
---|---|
selector.type | 默认replicating,多路复用为multiplexing |
selector.header | 默认flume.selector.header,配置为需要的报头名 |
selector.default | 默认channel |
selector.mapping.* | 报头对应的值为多少时,写入到哪些Channel |
对于每个事件,选择器查找配置中header参数指定的报头的键值,接下来检查该值是否与mapping中配置的值一致,如果一致Channel处理器会将事件写入到mapping配置的所有channel中。可选的映射也可以使用optional指定。如果选择器没有找到匹配或者报头不存在,那么他将事件写入到default参数配置的channel中。
如果一个事件没有映射到必须的channel,但是映射到一个或者多个可选的channel,那么该事件会被写到可选channel和默认的channel,任何写入到默认channel的失败将会导致抛出ChannelException异常。
下面是配置示例
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.optional.US = c4
a1.sources.r1.selector.default = c4
state值为CZ的事件写入到c1,state值为US的事件写入到c2,c3,如果都不满足写入到c4。
自定义Channel选择器
自定义的Channel需要实现ChannelSelector接口,或者继承AbstractChannelSelector类。
对应于每个事件,Channel处理器调用Channel选择器的getRequiredChannels和getoptionalChannels方法,返回需要和可选的将要写入事件的Channel列表。
AbstractChannelSelector 抽象类
public abstract class AbstractChannelSelector implements ChannelSelector {
private List<Channel> channels;
private String name;
@Override
public List<Channel> getAllChannels() {
return channels;
}
@Override
public void setChannels(List<Channel> channels) {
this.channels = channels;
}
@Override
public synchronized void setName(String name) {
this.name = name;
}
@Override
public synchronized String getName() {
return name;
}
/**
*
* @return A map of name to channel instance.
*/
protected Map<String, Channel> getChannelNameMap() {
Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
for (Channel ch : getAllChannels()) {
channelNameMap.put(ch.getName(), ch);
}
return channelNameMap;
}
/**
* Given a list of channel names as space delimited string,
* returns list of channels.
* @return List of {@linkplain Channel}s represented by the names.
*/
protected List<Channel> getChannelListFromNames(String channels,
Map<String, Channel> channelNameMap) {
List<Channel> configuredChannels = new ArrayList<Channel>();
if (channels == null || channels.isEmpty()) {
return configuredChannels;
}
String[] chNames = channels.split(" ");
for (String name : chNames) {
Channel ch = channelNameMap.get(name);
if (ch != null) {
configuredChannels.add(ch);
} else {
throw new FlumeException("Selector channel not found: "
+ name);
}
}
return configuredChannels;
}
}
ChannelSelector 接口
/**
* <p>
* Allows the selection of a subset of channels from the given set based on
* its implementation policy. Different implementations of this interface
* embody different policies that affect the choice of channels that a source
* will push the incoming events to.
* </p>
*/
public interface ChannelSelector extends NamedComponent, Configurable {
/**
* @param channels all channels the selector could select from.
*/
public void setChannels(List<Channel> channels);
/**
* Returns a list of required channels. A failure in writing the event to
* these channels must be communicated back to the source that received this
* event.
* @param event
* @return the list of required channels that this selector has selected for
* the given event.
*/
public List<Channel> getRequiredChannels(Event event);
/**
* Returns a list of optional channels. A failure in writing the event to
* these channels must be ignored.
* @param event
* @return the list of optional channels that this selector has selected for
* the given event.
*/
public List<Channel> getOptionalChannels(Event event);
/**
* @return the list of all channels that this selector is configured to work
* with.
*/
public List<Channel> getAllChannels();
}
Channel处理器调用SetChannel方法,该方法传递给所有的Channel,即选择器必须为每个事件选择Channel。该类实现了Configuration接口,所以当选择器初始化时调用Configure 方法。当每个事件要处理时,调用getRequiredChannel和getOptionChannel方法,getAllChannel方法必须返回在创建期间Channel处理器设置的所有Channel。
网友评论