美文网首页我爱编程
数据采集之Flume+Kafka

数据采集之Flume+Kafka

作者: 吃橘子的冬天 | 来源:发表于2017-12-05 10:43 被阅读243次

    Flume简介

    1. Flume特点

    flume是收集日志的开源软件解决方案之一,相对于其他同类软件他具有高可用的,高可靠的,分布式等特性。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source

    2. Flume核心概念

    • Agent 使用JVM运行Flume 每台机器运行一个agent , 但是可以在一个agent中包含多个sources和sinks
    • Client 生产数据 , 运行在一个独立的线程
    • Source 从Client收集数据 , 传递给Channel
    • Sink 从Channel收集数据 , 运行在一个独立线程
    • Channel 连接 sources 和 sinks ,这个有点像一个队列
    • Events 可以是日志记录、 avro 对象等

    Flume快速开发

    1. 安装

    • yum 方式下载安装 :
    [mis-ecif@hadoop10-4-0-226 ~]$ yum install flume 
    

    解压文件,若打印如下信息,解压缩报错 ,可能是包没下载完全,重新下载重试即可

    [mis-ecif@hadoop10-4-0-226 ~]$ tar -zxvf apache-flume-1.6.0-bin.tar.gz
    gzip: stdin: unexpected end of file  
    tar: Unexpected EOF in archive  
    tar: Unexpected EOF in archive
    tar: Error is not recoverable: exiting now
    

    若解压成功,可检测安装是否成功:/usr/local/flume/bin/flume-ng version
    打印以下信息,则表示安装成功了

    [mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
    Flume 1.6.0-transwarp-tdh480
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: Unknown
    Compiled by root on Fri Apr  7 07:52:45 UTC 2017
    From source with checksum 4031fa0e0507f30090f954451ab3a164
    

    若打印以下信息,可能是因为安装了hbase,将Hbase的hbase-env.sh文件中HBASE_CLASS注释掉即可

    [mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
    Could not find or load main class org.apache.flume.tools.GetJavaProperty #加载不了该类
    Flume 1.6.0-transwarp-tdh480
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: Unknown
    Compiled by root on Fri Apr  7 07:52:45 UTC 2017
    From source with checksum 4031fa0e0507f30090f954451ab3a164
    

    2. 开发

    • 更改Flume配置文件
    cd /usr/local/flume/conf/
    cp flume-env.sh.template flume-env.sh
    vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值
    
    • 创建Flume启动使用到的配置文件 exec_tail.conf
    [root@hadoop10-1-0-144 conf]# vi /local/flume/conf/exec_tail.conf
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.channels = c1
    a1.sources.r1.command = tail -F /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    3. 测试

    • 启动Flume
    flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
    
    • 往Flume监控日志中添加数据
    echo 'phoneinfo||223.104.7.66||OPPO R9sk||6.0.1||天津市||2017-07-25 06:53:23||中国移动||yingyongbao
    ' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
    
    echo 'phoneinfo||101.38.64.172||iPhone 6 Plus||10.3.2||北京市||2017-07-25 07:11:40||中国联通' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
    
    

    控制台若有数据打印,则表示测试成功

    4. 更改配置,与kafka集成

    • 将消息传给 kafka
    # Describe the sink
    a1.sinks.k1.type = logger
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = flume_demo
    #a1.sinks.k1.brokerList = 10.1.0.141:9092,10.1.0.142:9092,10.1.0.143:9092,10.1.0.144:9092
    #Kafka集群Broker列表
    a1.sinks.k1.brokerList = 10.1.0.144:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 100
    
    • 将消息缓存在本地文件系统中 --建议将消息缓存在本地文件系统
    # Use a channel which buffers events in memory
    a1.channels.c1.type = file
    a1.channels.c1.checkpoint = /mnt/disk1/flume/checkpoint #检查点文件存储路径
    a1.channels.c1.dataDirs = /mnt/disk1/flume/data #消息数据存储路径
    
    • 创建kafka topic
    ./kafka-topics.sh --zookeeper 10.1.0.144:2181 --create --topic flume_demo --partition 3 --replication-factor 1
    
    • 查看topic
    ./kafka-topics.sh  --list --zookeeper 10.1.0.144:2181
    
    • 启动kafka consumer,接收flume消息
    ./kafka-console-consumer.sh --topic flume_demo  --bootstrap-server 10.1.0.144:9092
    
    • 重启Flume
    flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
    
    • 往flume监控文件中添加日志
    echo 'phoneinfo||116.227.248.47||HUAWEI MT7-CL00||6.0||||2017-07-25 07:21:36||中国移动||yingyongbao'  >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
    

    查看kafka consumer窗口,若能够正常接收消息,则表示集成kafka成功。

    相关文章

      网友评论

        本文标题:数据采集之Flume+Kafka

        本文链接:https://www.haomeiwen.com/subject/bjpqixtx.html