美文网首页
flume整合kafka

flume整合kafka

作者: 螺旋加速器 | 来源:发表于2018-08-14 17:16 被阅读0次

    (以下测试内容都是基于jdk1.8.0_66,操作系统为win10,仅供学习和测试,部分内容引用自官网资源)

    安装并测试flume

      下载apache-flume-1.8.0版,解压。
      在目录conf下,有两个文件需要配置,一个是flume-env.ps1,用于配置环境变量。另一个flume-conf.properties.template是示例配置文件。
      修改flume-env.ps1,加入JAVA变量:

    $JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "
    

      复制配置文件,命名为ota-conf.properties,并修改内容:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    
    a1.sinks.k1.channel = c1
    

      运行:

    bin\flume-ng.cmd agent -name a1 -conf-file conf\ota-conf.properties
    

      打开另一个命令窗口,通过telnet localhost 44444之后发送命令,在flume控制台可以看到消息输出。

    安装并测试kafka

      安装kafka之前需要先安装zookeeper(或者使用kafka自带的zk,笔者为自行下载安装),下载并解压到系统目录,笔者下载的是zookeeper-3.4.8版本。
      运行zk,修改配置文件,注意调整dataDir和clientPort两项。

    dataDir=D:\\develop\\zookeeper-3.4.8\\data
    clientPort=2181
    

      执行命令:

    bin\zkServer.cmd
    

      下载并解压kafka,笔者下载的是kafka_2.11-2.0.0版本。修改配置文件,server.properties,主要调整zk的连接地址。

    zookeeper.connect=localhost:2181
    

      启动kafka并创建一个主题:

    bin\windows\kafka-server-start.bat config\server.properties
    bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

      测试发布消息:

    bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
    

      测试消费消息:

    bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
    

    配置flume数据输出到kafka

      flume配置文件如下:

     # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    #a1.sources.r1.type = exec
    #a1.sources.r1.command = tail -f -n+1 info.log
    # Describe the sink
    #a1.sinks.k1.type = logger
    #a1.sinks.k1.maxBytesToLog =1024
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = test
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type= snappy
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    #a1.channels.c1.byteCapacity = 1000000
    #a1.channels.c1.transactionCapacity = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

      重启flume。然后通过telnet输入消息。通过上文中的kafka测试comsumer查看消息是否正常被kafka接收并消费。

    相关文章

      网友评论

          本文标题:flume整合kafka

          本文链接:https://www.haomeiwen.com/subject/gkuvbftx.html