美文网首页
Flume选择器

Flume选择器

作者: 叫我不矜持 | 来源:发表于2019-06-01 17:57 被阅读0次

    前言

    Channel选择器是决定Source接受的特定事件写入到哪个Channel的组件,他们告诉Channel处理器,然后由其将事件写入到Channel。

    Agent中各个组件的交互

    由于Flume不是两阶段提交,事件被写入到一个Channel,然后事件在写入下一个Channel之前提交,如果写入一个Channel出现异常,那么之前已经写入到其他Channel的相同事件不能被回滚。当这样的异常发生时,Channel处理器抛出ChannelException异常,事务失败,如果Source试图再次写入相同的事件(大多数情况下,会再次写入,只有Syslog,Exec等Source不能重试,因为没有办法生成相同的数据),重复的事件将写入到Channel中,而先前的提交是成功的,这样在Flume中就发生了重复。

    Channel选择器的配置是通过Channel处理器完成的,Channel选择器可以指定一组Channel是必须的,另一组的可选的。

    Flume内置两种选择器,replicating 和 multiplexing,如果Source配置中没有指定选择器,那么会自动使用复制Channel选择器。

    复制Channel选择器

    如果Source没有指定Channel选择器,则该Source使用复制Channel选择器。该选择器复制每个事件到通过Source的Channels参数指定的所有Channel中。

    复制Channel选择器还有一个配置参数optional,该参数指定的所有Channel都是可选的,当事件写入到这些Channel时又失败发生,则忽略这些失败。写入其他Channel的失败将导致Source抛出异常,并要求Source重试。

    配置示例

    a1.sources = r1
    a1.channels = c1 c2 c3
    a1.sources.r1.selector.type = replicating
    a1.sources.r1.channels = c1 c2 c3
    a1.sources.r1.selector.optional = c3
    

    如上所示,未能写入c3不会导致ChannelException异常抛出到Source,并且Source将通知前一阶段写入是成功的。

    多路复用Channel选择器

    多路复用Channel选择器是一种专门用于动态路由事件的Channel选择器,通过选择事件应该写入到哪个Channel,基于一个特定的事件头的值进行路由。可以结合拦截器使用。

    多路复用Channel选择器寻找一个特定的报头,该报头通过通过选择器的配置指定,基于该报头的值,选择器返回事件写入Channel的一个子集,如果配置没有指定一个特定事件的报头,则该事件写入到默认Channel。

    参数 描述
    selector.type 默认replicating,多路复用为multiplexing
    selector.header 默认flume.selector.header,配置为需要的报头名
    selector.default 默认channel
    selector.mapping.* 报头对应的值为多少时,写入到哪些Channel

    对于每个事件,选择器查找配置中header参数指定的报头的键值,接下来检查该值是否与mapping中配置的值一致,如果一致Channel处理器会将事件写入到mapping配置的所有channel中。可选的映射也可以使用optional指定。如果选择器没有找到匹配或者报头不存在,那么他将事件写入到default参数配置的channel中。

    如果一个事件没有映射到必须的channel,但是映射到一个或者多个可选的channel,那么该事件会被写到可选channel和默认的channel,任何写入到默认channel的失败将会导致抛出ChannelException异常。

    下面是配置示例

    a1.sources = r1
    a1.channels = c1 c2 c3 c4
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = state
    a1.sources.r1.selector.mapping.CZ = c1
    a1.sources.r1.selector.mapping.US = c2 c3
    a1.sources.r1.selector.optional.US = c4
    a1.sources.r1.selector.default = c4
    

    state值为CZ的事件写入到c1,state值为US的事件写入到c2,c3,如果都不满足写入到c4。

    自定义Channel选择器

    自定义的Channel需要实现ChannelSelector接口,或者继承AbstractChannelSelector类。

    对应于每个事件,Channel处理器调用Channel选择器的getRequiredChannels和getoptionalChannels方法,返回需要和可选的将要写入事件的Channel列表。

    AbstractChannelSelector 抽象类

    public abstract class AbstractChannelSelector implements ChannelSelector {
    
      private List<Channel> channels;
      private String name;
    
      @Override
      public List<Channel> getAllChannels() {
        return channels;
      }
    
      @Override
      public void setChannels(List<Channel> channels) {
        this.channels = channels;
      }
    
      @Override
      public synchronized void setName(String name) {
        this.name = name;
      }
    
      @Override
      public synchronized String getName() {
        return name;
      }
    
      /**
       *
       * @return A map of name to channel instance.
       */
    
      protected Map<String, Channel> getChannelNameMap() {
        Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
        for (Channel ch : getAllChannels()) {
          channelNameMap.put(ch.getName(), ch);
        }
        return channelNameMap;
      }
    
      /**
       * Given a list of channel names as space delimited string,
       * returns list of channels.
       * @return List of {@linkplain Channel}s represented by the names.
       */
      protected List<Channel> getChannelListFromNames(String channels,
              Map<String, Channel> channelNameMap) {
        List<Channel> configuredChannels = new ArrayList<Channel>();
        if (channels == null || channels.isEmpty()) {
          return configuredChannels;
        }
        String[] chNames = channels.split(" ");
        for (String name : chNames) {
          Channel ch = channelNameMap.get(name);
          if (ch != null) {
            configuredChannels.add(ch);
          } else {
            throw new FlumeException("Selector channel not found: "
                    + name);
          }
        }
        return configuredChannels;
      }
    
    }
    
    

    ChannelSelector 接口

    /**
     * <p>
     * Allows the selection of a subset of channels from the given set based on
     * its implementation policy. Different implementations of this interface
     * embody different policies that affect the choice of channels that a source
     * will push the incoming events to.
     * </p>
     */
    public interface ChannelSelector extends NamedComponent, Configurable {
    
      /**
       * @param channels all channels the selector could select from.
       */
      public void setChannels(List<Channel> channels);
    
      /**
       * Returns a list of required channels. A failure in writing the event to
       * these channels must be communicated back to the source that received this
       * event.
       * @param event
       * @return the list of required channels that this selector has selected for
       * the given event.
       */
      public List<Channel> getRequiredChannels(Event event);
    
    
      /**
       * Returns a list of optional channels. A failure in writing the event to
       * these channels must be ignored.
       * @param event
       * @return the list of optional channels that this selector has selected for
       * the given event.
       */
      public List<Channel> getOptionalChannels(Event event);
    
      /**
       * @return the list of all channels that this selector is configured to work
       * with.
       */
      public List<Channel> getAllChannels();
    
    }
    
    

    Channel处理器调用SetChannel方法,该方法传递给所有的Channel,即选择器必须为每个事件选择Channel。该类实现了Configuration接口,所以当选择器初始化时调用Configure 方法。当每个事件要处理时,调用getRequiredChannel和getOptionChannel方法,getAllChannel方法必须返回在创建期间Channel处理器设置的所有Channel。

    相关文章

      网友评论

          本文标题:Flume选择器

          本文链接:https://www.haomeiwen.com/subject/kcywzqtx.html