1 flume安装
首先我们设置数据来源为端口数据,然后数据发送到hdfs和kafka的cmcc topic中,其中flume的配置文件为:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = cmcc
a1.sinks.k1.brokerList = hostname:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.path = hdfs://hostname:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k2.hdfs.filePrefix = cmcc
a1.sinks.k2.hdfs.minBlockREplicas = 1
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.rollInterval = 60
a1.sinks.k2.hdfs.rollSize = 0
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sources.r1.channels = c1 c2 说明我的数据源是同一份,但是分发到不同通道上。
2,安装kafka
3,测试flume发送数据,hdfs和kafka是否能够收到
flume执行如下命令:
bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -n a1
然后在linux中执行telnet localhost 44444,进行发送数据:
Paste_Image.png然后检查HDFS:
Paste_Image.png检查Kafka数据
Paste_Image.png
网友评论