美文网首页大数据
flume与kafka集成遇到的问题与解决思路

flume与kafka集成遇到的问题与解决思路

作者: xcrossed | 来源:发表于2016-07-15 23:52 被阅读5612次

0x00 背景知识

基本上想去用flume的同学都知道点flume的用途了。flume是一个分布式,可靠的,易用的,可以将不同源的日志进行,收集,汇总,或者存储的中间件。

0x01 使用场景

  • 数据来源:系统现有日志,有python脚本源源不断的从s3上拉下来,每10分钟拉一次,一次可能会拉取多个日志文件,视日志量而定,每个文件最大是10w行,超过会被分割。
  • 数据流转:需要及时将上面产生的日志发往kafka

0x02 flume的使用

flume支持三种不同的agent来发送数据,我这里比较符合的是spooldir这种方式.

  • 基本配置如下
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

0x03 遇到的问题

  • 文件读写报错

SpoolDir Source throws IllegalStateException: File has changed size since being read

运行后报上面的错,查了资料说是在flume读这个文件时,该文件不能被继续写入。改了数据生成逻辑,在没有写完成前,以.tmp结尾,写完后,再重命名去掉tmp后缀。涉及到的配置也比较简单

a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
  • kafka partition数据不均匀问题

发送kafka partition上面的数据不均匀,每次发送时,只往一个partition上面发,并没有同时往多个partition上面发。

查了资料,说是发送消息时不指定key将会随机发,但事实上,并没有。
这时,自己用python带的kafka python库直接发送测试,数据是均匀的。说明kafka集群是没问题的。这时候问题出在 kafka sink端。

事情到了这里,似乎需要正面刚这个问题了。
去flume官网下载源文件

  • 查看kafka sink源码,在flume-ng-kafka-sink这个文件夹下面,只有三个文件。
  • 查看最主要的文件 KafkaSink.java
  • 查看构造key的过程,核心代码如下
public static final String KEY_HDR = "key";

eventKey = headers.get(KEY_HDR);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
messageList.add(data);

说明key是从event中拿到的,我们只需要在event中构造一个包含key为 key 的header 键值对就能达到目的。

事情到了这里,似乎只要搞定event中加key就可以搞定了。

查询官方文档,发现还有一个拦截器 Interceptor 的玩意儿。

flume默认提供了一些拦截器

  • Timestamp Interceptor
  • Host Interceptor
  • Static Interceptor
  • UUID Interceptor
  • Regex Filtering Interceptor
  • Regex Extractor Interceptor
  • Search and Replace Interceptor
  • orphline Interceptor
  • ...

我们需要一个能配置headerName的拦截器,找了一下,只有uuid拦截器符合要求。

a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key

加上上面二行,重启flume

/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

查看kafka-manager中的partition中message的分布,果然妥妥的均匀了。

完整的配置如下:

a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
a1.sources.flume0.interceptors = i1
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
a1.sources.flume0.interceptors.i1.preserveExisting = false

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

使用的版本为flume 1.6

其实,真正没有随机的原因本文并没有直接去找到,只是另辟蹊径解决了问题。

相关文章

网友评论

    本文标题:flume与kafka集成遇到的问题与解决思路

    本文链接:https://www.haomeiwen.com/subject/rltujttx.html