美文网首页
大数据学习之:Flume

大数据学习之:Flume

作者: 我问你瓜保熟吗 | 来源:发表于2021-01-06 20:03 被阅读0次

    flume作用

    • 从磁盘采集文件发送到HDFS
    • 数据采集来源:系统日志文件、Python爬虫数据、端口数据
    • 数据发送目标:HDFS、Kafka

    flume 组成

    • agent 是一个独立的Flume进程,包含组件Source、Channel、Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。)
    • source Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中
    • channel Channel是一个缓冲区,它将保存事件直到Sink处理完该事件
    • sink Sink负责持久化日志或者把事件推向另一个Source。
    • Client:Client生产数据,运行在一个独立的线程。
    • Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
    • Flow: Event从源点到达目的点的迁移的抽象。

    flume 读取源

    • Taildir 本地目录数据
    • Avro 微型rpc框架,用来flume和flume对接。
    • Kafka
    • NetCat linux上的一个通讯工具,nc命令
    • Exec 命令行,例如 tail -f 数据

    flume 发送源

    • Logger 控制台,用来调试较多
    • Avro 微型rpc框架,用来flume和flume对接。
    • Kafka

    flume channel

    • Memory
    • File
    • kafka
    • JDBC

    flume 安装

    • flume1.9.0下载链接
    • tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/flume
    • cp flume-env.sh.template flume-env.sh
    • 添加环境变量: vim /etc/profile 添加内容后 source ~/.bashrc
    export FLUME_HOME=/usr/local/flume
    export PATH=$PATH:$FLUME_HOME/bin
    
    • 查看flume 版本:flume-ng version

    案例一:监听端口,通过控制台输出

    • 创建agent配置文件 touch /usr/local/job/netcat-flume-logger.conf
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 启动agent
    flume-ng agent -n a1 -/usr/local/flume/conf/nf -f /usr/local/flume/job/netcat-flume-logger.conf  -Dflume.root.logger=INFO,console
    
    • 配置flume source
    yum -y install netcat
    nc localhost 44444
    输入内容后发送,在flume控制台,查看接收到的内容是否一致。
    

    案例二:监控单个文件追加内容,通过控制台输出

    • 创建agent配置文件 touch /usr/local/job/file-flume-logger.conf
    
    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command= tail -f /var/log/hive.log
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 启动agent:flume-ng agent -n a1 -c /usr/local/flume/conf -f /usr/local/flume/job/file-flume-logger.conf -Dflume.root.logger=INFO,console
    • 往日志文件写入数据进行测试

    案例三:监控单个动态变化的文件,输出到hdfs

    • 添加如下jar包到flume的lib目录


      依赖包
    • 创建agent配置文件 touch /usr/local/job/file-flume-hdfs.conf
    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -f /var/log/hive.log
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://hadoop01:9000/flume/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a2.sinks.k2.hdfs.batchSize = 1000
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    #多久生成一个新的文件,这里是30秒才会滚动一个文件到hdfs中,不到30秒是临时文件
    a2.sinks.k2.hdfs.rollInterval = 30
    #设置每个文件的滚动大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a2.sinks.k2.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    
    • 启动agent:flume-ng agent -n a2 -c /usr/local/flume/conf -f /usr/local/flume/job/file-flume-hdfs.conf

    案例四:监控目录内的新文件,输出到hdfs

    • 创建agent配置文件 touch /usr/local/flume/job/dir-flume-hdfs.conf

    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = spooldir
    a2.sources.r2.spoolDir = /home/hadoop/upload
    a2.sources.r2.ignorePattern = ([^ ]*\.tmp)
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://hadoop01:9000/flume/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a2.sinks.k2.hdfs.batchSize = 1000
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k2.hdfs.rollInterval = 30
    #设置每个文件的滚动大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a2.sinks.k2.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    
    • 启动agent:flume-ng agent -n a2 -c /usr/local/flume/conf -f /usr/local/flume/job/dir-flume-hdfs.conf

    参考链接

    相关文章

      网友评论

          本文标题:大数据学习之:Flume

          本文链接:https://www.haomeiwen.com/subject/okbznktx.html