美文网首页数客联盟程序员
通过Flume将本地文件导入Kafka

通过Flume将本地文件导入Kafka

作者: Woople | 来源:发表于2017-05-26 17:29 被阅读644次

    业务场景

    外部设备定期会向本地目录里面写入文本文件,文本内容每行都是以竖线分割,例如

    2|235|872|6|460078003|0|||02000bda||
    2|235|852|3|460078004|0|||02000bda||
    1|235|772|1|460078005|0|||02000bda||
    

    需要通过flume将文件内容导入到kafka中,并指定源文件的第5列的值作为kafka每条消息的key

    Flume完整配置

    agent.sources = r1
    agent.channels = c1
    agent.sinks = s1
    
    agent.sources.r1.type = spooldir
    agent.sources.r1.spoolDir = /tmp/flume
    
    agent.sources.r1.interceptors = i1
    agent.sources.r1.interceptors.i1.type = regex_extractor
    agent.sources.r1.interceptors.i1.regex = .*\\|.*\\|.*\\|.*\\|(.*)\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*
    agent.sources.r1.interceptors.i1.serializers = s1
    agent.sources.r1.interceptors.i1.serializers.s1.name = key
    
    agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.s1.topic = test
    agent.sinks.s1.brokerList = host1:6667
    agent.sinks.s1.requiredAcks = 1
    agent.sinks.s1.batchSize = 20
    
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    
    agent.sinks.s1.channel = c1
    agent.sources.r1.channels = c1
    

    配置说明

    实现此功能的关键是使用了flume的正则过滤器(regex_extractor),下面就详细说明一下这个过滤器是如何工作的。

    这里使用了正则表达式的组的概念,也就是说在regex中定义的组,每个组匹配的值作为header中的value,而此value的key为s1.name对应的名字。

    以下代码是模拟Flume正则过滤器的实现:

        Pattern regex = Pattern.compile("(.*)\\|.*\\|.*\\|(.*)\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*");
            
        Matcher matcher = regex.matcher("999|235|872|888|460077171338003|0|||02000bda||");
            
        if (matcher.find()) {
            for (int group = 0, count = matcher.groupCount(); group < count; group++) {
                int groupIndex = group + 1;
    
                System.out.println(matcher.group(groupIndex));
            }
        }
    

    返回结果是

    999
    888
    

    可见首先找到所有组的值,然后对应serializers配置,再由serializers.s1.name找到对应的key。

    例如是下面这样的配置,而且原数据中有一行999|235|872|888|460077171338003|0|||02000bda||,那么在header中会有两个键值对,key1->999, key2->888

    agent.sources.r1.interceptors.i1.regex = .(.*)\\|.*\\|.*\\|(.*)\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*\\|.*
    agent.sources.r1.interceptors.i1.serializers = s1 s2
    agent.sources.r1.interceptors.i1.serializers.s1.name = key1
    agent.sources.r1.interceptors.i1.serializers.s2.name = key2
    

    源码分析

      public Event intercept(Event event) {
        Matcher matcher = regex.matcher(
            new String(event.getBody(), Charsets.UTF_8));//event.getBody()为消息体
        Map<String, String> headers = event.getHeaders();
        if (matcher.find()) {
          for (int group = 0, count = matcher.groupCount(); group < count; group++) {
            int groupIndex = group + 1;
            if (groupIndex > serializers.size()) {
              if (logger.isDebugEnabled()) {
                logger.debug("Skipping group {} to {} due to missing serializer",
                    group, count);
              }
              break;
            }
            NameAndSerializer serializer = serializers.get(group);
            if (logger.isDebugEnabled()) {
              logger.debug("Serializing {} using {}", serializer.headerName,
                  serializer.serializer);
            }
            headers.put(serializer.headerName, //serializers.xxx.name定义的值
                serializer.serializer.serialize(matcher.group(groupIndex)));//matcher.group(groupIndex)为根据组匹配到的值
          }
        }
        return event;
      }
    
    

    相关文章

      网友评论

        本文标题:通过Flume将本地文件导入Kafka

        本文链接:https://www.haomeiwen.com/subject/ylyxfxtx.html