美文网首页
Flume与KafKa集成

Flume与KafKa集成

作者: hipeer | 来源:发表于2018-10-18 18:20 被阅读0次

    使用Flume把数据送到Kafka
    • Source - local file
    • Channel - local file
    • Sink - Kafka
    # test Agent
    test.sources = testSource
    test.channels = testChannel
    test.sinks = testSink
    
    test.sources.testSource.type = spooldir
    test.sources.testSource.deserializer = LINE
    test.sources.testSource.deserializer.maxLineLength = 6400
    test.sources.testSource.spoolDir = /events/input/intra/test
    test.sources.testSource.includePattern = test_[0-9]{4]-[0-9]{2]-[0-9]{2].csv
    test.sources.testSource.channels = testChannel
    
    test.channels.testChannel.type = file
    test.channels.testChannel.checkpointDir = /var/flume/checkpoint/test
    test.channels.testChannel.dataDirs = /var/flume/data/test
    
    test.sinks.testSink.type = oorg.apache.flume.sink.kafka.KafkaSink
    test.sinks.testSink.batchSize = 640
    test.sinks.testSink.brokerList = localhost:6667
    test.sinks.testSink.topic = test1
    test.sinks.testSink.channel = testChannel
    
    使用Flume把Kafka中的数据送到HDFS
    • Source - Kafka
    • Channel - Memory
    • Sink - HDFS
    # Sources, channels, and sinks are defined
    kgc.sources = kafkaSource
    kgc.channels = memoryChannel
    kgc.sinks = hdfsSink
    
    # kafka Source
    kgc.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
    kgc.sources.kafkaSource.zookeeperConnect = localhost:2181
    kgc.sources.kafkaSource.topic = demo
    kgc.sources.kafkaSource.batchSize = 1
    kgc.sources.kafkaSource.channels = memoryChannel
    
    # memory Channel
    kgc.channels.memoryChannel.type = memory
    kgc.channels.memoryChannel.capacity = 10000
    kgc.channels.memoryChannel.transactioncapacity = 1000
    
    # hdfs Sink
    kgc.sinks.hdfsSink.channel = memoryChannel
    kgc.sinks.hdfsSink.type = hdfs
    kgc.sinks.hdfsSink.hdfs.writeFormat = Text
    kgc.sinks.hdfsSink.hdfs.fileType = DataStream
    kgc.sinks.hdfsSink.hdfs.filePrefix = kgcEvents
    kgc.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
    kgc.sinks.hdfsSink.hdfs.path = /tmp/demo/%y-%m-%d
    kgc.sinks.hdfsSink.hdfs.rollCount=1
    kgc.sinks.hdfsSink.hdfs.rollSize=0
    
    Kafka作为Channel把数据送到HDFS
    • source - local file
    • channel - Kafka
    • Sink - HDFS
    exercise.sources = execSource
    exercise.channels = kafkaChannel
    exercise.sinks = hdfsSink
     
    exercise.sources.execSource.type = exec
    exercise.sources.execSource.command = /usr/bin/vmstat 1
    exercise.sources.execSource.channels = kafkaChannel
     
    exercise.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
    exercise.channels.kafkaChannel.capacity = 10000
    exercise.channels.kafkaChannel.transactionCapacity = 1000
    exercise.channels.kafkaChannel.brokerList = localhost:6667
    exercise.channels.kafkaChannel.topic = flafka
    exercise.channels.kafkaChannel.zookeeperConnect = localhost:2181
    exercise.channels.kafkaChannel.parseAsFlumeEvent = true
     
    exercise.sinks.hdfsSink.type = hdfs
    exercise.sinks.hdfsSink.hdfs.path = /tmp/kafka/channel
    exercise.sinks.hdfsSink.hdfs.rollInterval = 5
    exercise.sinks.hdfsSink.hdfs.rollSize = 0
    exercise.sinks.hdfsSink.hdfs.rollCount = 0
    exercise.sinks.hdfsSink.hdfs.fileType = DataStream
    exercise.sinks.hdfsSink.channel = kafkaChannel
    

    相关文章

      网友评论

          本文标题:Flume与KafKa集成

          本文链接:https://www.haomeiwen.com/subject/jteszftx.html