美文网首页
flume整合kafka

flume整合kafka

作者: 螺旋加速器 | 来源:发表于2018-08-14 17:16 被阅读0次

(以下测试内容都是基于jdk1.8.0_66,操作系统为win10,仅供学习和测试,部分内容引用自官网资源)

安装并测试flume

  下载apache-flume-1.8.0版,解压。
  在目录conf下,有两个文件需要配置,一个是flume-env.ps1,用于配置环境变量。另一个flume-conf.properties.template是示例配置文件。
  修改flume-env.ps1,加入JAVA变量:

$JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "

  复制配置文件,命名为ota-conf.properties,并修改内容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

  运行:

bin\flume-ng.cmd agent -name a1 -conf-file conf\ota-conf.properties

  打开另一个命令窗口,通过telnet localhost 44444之后发送命令,在flume控制台可以看到消息输出。

安装并测试kafka

  安装kafka之前需要先安装zookeeper(或者使用kafka自带的zk,笔者为自行下载安装),下载并解压到系统目录,笔者下载的是zookeeper-3.4.8版本。
  运行zk,修改配置文件,注意调整dataDir和clientPort两项。

dataDir=D:\\develop\\zookeeper-3.4.8\\data
clientPort=2181

  执行命令:

bin\zkServer.cmd

  下载并解压kafka,笔者下载的是kafka_2.11-2.0.0版本。修改配置文件,server.properties,主要调整zk的连接地址。

zookeeper.connect=localhost:2181

  启动kafka并创建一个主题:

bin\windows\kafka-server-start.bat config\server.properties
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  测试发布消息:

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

  测试消费消息:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

配置flume数据输出到kafka

  flume配置文件如下:

 # Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -f -n+1 info.log
# Describe the sink
#a1.sinks.k1.type = logger
#a1.sinks.k1.maxBytesToLog =1024
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type= snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#a1.channels.c1.byteCapacity = 1000000
#a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  重启flume。然后通过telnet输入消息。通过上文中的kafka测试comsumer查看消息是否正常被kafka接收并消费。

相关文章

网友评论

      本文标题:flume整合kafka

      本文链接:https://www.haomeiwen.com/subject/gkuvbftx.html