美文网首页大数据
flume与kafka集成遇到的问题与解决思路

flume与kafka集成遇到的问题与解决思路

作者: xcrossed | 来源:发表于2016-07-15 23:52 被阅读5612次

    0x00 背景知识

    基本上想去用flume的同学都知道点flume的用途了。flume是一个分布式,可靠的,易用的,可以将不同源的日志进行,收集,汇总,或者存储的中间件。

    0x01 使用场景

    • 数据来源:系统现有日志,有python脚本源源不断的从s3上拉下来,每10分钟拉一次,一次可能会拉取多个日志文件,视日志量而定,每个文件最大是10w行,超过会被分割。
    • 数据流转:需要及时将上面产生的日志发往kafka

    0x02 flume的使用

    flume支持三种不同的agent来发送数据,我这里比较符合的是spooldir这种方式.

    • 基本配置如下
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = flume0
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.flume0.type = spooldir
    a1.sources.flume0.spoolDir = /data/appdata/download
    a1.sources.flume0.fileHeader = false
    a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = shopping
    a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 100
    
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 200
    a1.channels.c1.transactionCapacity = 200
    
    # Bind the source and sink to the channel
    a1.sources.flume0.channels = c1
    a1.sinks.k1.channel = c1
    

    0x03 遇到的问题

    • 文件读写报错

    SpoolDir Source throws IllegalStateException: File has changed size since being read

    运行后报上面的错,查了资料说是在flume读这个文件时,该文件不能被继续写入。改了数据生成逻辑,在没有写完成前,以.tmp结尾,写完后,再重命名去掉tmp后缀。涉及到的配置也比较简单

    a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
    
    • kafka partition数据不均匀问题

    发送kafka partition上面的数据不均匀,每次发送时,只往一个partition上面发,并没有同时往多个partition上面发。

    查了资料,说是发送消息时不指定key将会随机发,但事实上,并没有。
    这时,自己用python带的kafka python库直接发送测试,数据是均匀的。说明kafka集群是没问题的。这时候问题出在 kafka sink端。

    事情到了这里,似乎需要正面刚这个问题了。
    去flume官网下载源文件

    • 查看kafka sink源码,在flume-ng-kafka-sink这个文件夹下面,只有三个文件。
    • 查看最主要的文件 KafkaSink.java
    • 查看构造key的过程,核心代码如下
    public static final String KEY_HDR = "key";
    
    eventKey = headers.get(KEY_HDR);
    KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
              (eventTopic, eventKey, eventBody);
    messageList.add(data);
    

    说明key是从event中拿到的,我们只需要在event中构造一个包含key为 key 的header 键值对就能达到目的。

    事情到了这里,似乎只要搞定event中加key就可以搞定了。

    查询官方文档,发现还有一个拦截器 Interceptor 的玩意儿。

    flume默认提供了一些拦截器

    • Timestamp Interceptor
    • Host Interceptor
    • Static Interceptor
    • UUID Interceptor
    • Regex Filtering Interceptor
    • Regex Extractor Interceptor
    • Search and Replace Interceptor
    • orphline Interceptor
    • ...

    我们需要一个能配置headerName的拦截器,找了一下,只有uuid拦截器符合要求。

    a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    a1.sources.flume0.interceptors.i1.headerName = key
    

    加上上面二行,重启flume

    /usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
    

    查看kafka-manager中的partition中message的分布,果然妥妥的均匀了。

    完整的配置如下:

    a1.sources = flume0
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.flume0.type = spooldir
    a1.sources.flume0.spoolDir = /data/appdata/download
    a1.sources.flume0.fileHeader = false
    a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
    a1.sources.flume0.interceptors = i1
    a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    a1.sources.flume0.interceptors.i1.headerName = key
    a1.sources.flume0.interceptors.i1.preserveExisting = false
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = shopping
    a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 100
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 200
    a1.channels.c1.transactionCapacity = 200
    
    # Bind the source and sink to the channel
    a1.sources.flume0.channels = c1
    a1.sinks.k1.channel = c1
    

    使用的版本为flume 1.6

    其实,真正没有随机的原因本文并没有直接去找到,只是另辟蹊径解决了问题。

    相关文章

      网友评论

        本文标题:flume与kafka集成遇到的问题与解决思路

        本文链接:https://www.haomeiwen.com/subject/rltujttx.html