美文网首页
flume kafka source sink

flume kafka source sink

作者: 杰杰微电 | 来源:发表于2021-03-09 15:52 被阅读0次

    如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,数据仍然会被写入到Source中指定的Topic中。

    agent_log.sources = kafka0

    agent_log.channels = ch0

    agent_log.sinks = sink0

    agent_log.sources.kafka0.channels = ch0

    agent_log.sinks.sink0.channel = ch0

    #sources定义

    agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource

    agent_log.sources.kafka0.kafka.bootstrap.servers = localhost:9092

    #agent.sources.kafka-source.zookeeper.connect =127.0.0.1:2181

    agent_log.sources.kafka0.kafka.topics = testsong,songtest

    agent_log.sources.kafka0.kafka.group.id= test

    #拦截器处理,topic覆盖问题

    agent_log.sources.kafka0.interceptors = i1

    agent_log.sources.kafka0.interceptors.i1.type = static

    agent_log.sources.kafka0.interceptors.i1.key = topic

    agent_log.sources.kafka0.interceptors.i1.preserveExisting = false

    agent_log.sources.kafka0.interceptors.i1.value = testsongout

    #channels定义

    agent_log.channels.ch0.type = memory

    agent_log.channels.ch0.capacity = 2048

    agent_log.channels.ch0.transactionCapacity = 1000

    #sink定义

    agent_log.sinks.sink0.channel = ch0

    agent_log.sinks.sink0.type = org.apache.flume.sink.kafka.KafkaSink 

    agent_log.sinks.sink0.brokerList = localhost:9092 

    agent_log.sinks.sink0.topic = testsongout

    相关文章

      网友评论

          本文标题:flume kafka source sink

          本文链接:https://www.haomeiwen.com/subject/lqkbqltx.html