美文网首页大数据devops
flume同时使用KafkaSource、KafkaSink导致

flume同时使用KafkaSource、KafkaSink导致

作者: 赵荆州 | 来源:发表于2018-11-14 15:49 被阅读16次

    KafkaSource 配置topic:topic1
    KafkaSink 配置topic:topic2
    从topic1拉取数据,经过简单处理后发到topic2
    但是你会发现flume一直在循环读写topic1
    原因就是KafkaSink中的这段代码:

    if (eventTopic == null) {
        eventTopic = topic;
    } 
    

    首先从headers中获取TOPIC_HEADER(topic),然后优先使用。然而在KafkaSource中则会将topic1 PUT到Header中,所以导致循环读写topic1。

    如何解决呢?

    你可以重新KafkaSink,改掉上面那段代码。

    或者配置一个拦截器,将KafkaSource 写到Header中的topic给替换掉:

    agent.sources.so1.interceptors.i1.type = static
    agent.sources.so1.interceptors.i1.key = topic
    agent.sources.so1.interceptors.i1.preserveExisting = false
    agent.sources.so1.interceptors.i1.value = topic2
    

    相关文章

      网友评论

        本文标题:flume同时使用KafkaSource、KafkaSink导致

        本文链接:https://www.haomeiwen.com/subject/ohejfqtx.html