大数据常用的采集工具
image.png我们的数据源一般有两种
- 业务数据库mysql,oracle等 sqoop采集
- 日志 Flume采集
sqoop只能采集数据库资源到离线数仓当中
Flume可以采集日志 到离线数仓或者实时数仓当中
Flume简介
Flume是由Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的工具,在大数据领域主要看中了 flume海量的日志实时采集功能
Flume安装规则,日志在哪台机器,就在那台机器上安装
Flume架构
-
单机架构
image.png
image.png -
Source:对接数据源
-
Sink:目标,下沉地 flume采集完数据要导入的地方
-
Channel :中间缓存区,为了防止source的数据量很大而sink 发送的数据量又比较小所以加了中间缓冲区
以上三个组成了Flume的 Agent进程
拥有了以上架构,那么数据在flume中以什么样的形式存在的呢?
-
event 包 ,Flume内部数据传输的最小单元
Flume中的数据以 event形式存在的,event 相当于一个带头节点的 字节数组
image.png
-
串联架构
image.png
-级联架构
image.png
应用场景
场景一 实时日志采集
场景二 数据量很大,实时的小部分小部分的采集,节省时间
场景三 断点续传
Flume开发步骤
1.需要先编写配置文件开发手册
vim /export/server/flume/conf/exec-mem-log.properties
创建一个配置文件xxx.
image.png
channels 缓存,有内存和磁盘 一般采用内存 如果为了数据安全可以采用磁盘
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
#define the agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#指定监视的source路径 和 监视模式 exec只能监听一个文件
#define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /export/server/flume/datas/test.log
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
#define the sink
a1.sinks.k1.type = logger
#bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
2.Flume source 监听模式
-
exec source :只能监听采集单个文件
-
Taildir source:动态采集多个文件
-可以监听文件 也可可以监听目录TailDir Soucre
监控文件、监控目录
1. Tail,在Unix系统中,实时读取文件最后内容
tail -F xxx.log
2. Dir
监控某个目录,当目录中有文件出现时,立即读取文件数据
这个taildir组件并不是简单的将之前的两个合二为一,还支持断点续传的功能。
什么断点续传?
程序中断的时候,记录中断的位置 重启程序 接着干 保证连续Taildir中指定元数据文件的功能,如果采集过程中,发生故障,重启后可以根据元数据文件的内容,从没有采集过的数据那里开始采集
5、flume taildir 如何实现断点续传的.png
3.Channel 缓存
image.png在Flume中,编写Agent,最常见使用Channel缓存:Memory内存和File文件,负责临时缓存数据。
-Memory Channel:将数据缓存在内存中(常用)
-File Channel:将数据缓存在文件中
-参数
4.HDFS SINK
flume中最重要的一个sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
- 开发配置文件内容
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /export/server/flume/datas/test.log
#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1.itcast.cn:8020/flume/test1
#指定生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = nginx
#指定生成的文件的后缀
a1.sinks.k1.hdfs.fileSuffix = .log
#指定写入HDFS的文件的类型:普通的文件 支持 序列化文件 压缩文件
a1.sinks.k1.hdfs.fileType = DataStream
#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1
- 启动服务
/export/server/flume/bin/flume-ng agent -n a1 \
-c /export/server/flume/conf/ \
-f /export/server/flume/conf/hdfs-mem-log.properties \
-Dflume.root.logger=INFO,console
Flume默认写入HDFS上会产生很多小文件,都在1KB左右,不利用HDFS存储
-解决 指定文件大小
# hdfs.rollInterval:
按照时间间隔生成文件
# hdfs.rollSize:
指定HDFS生成的文件大小
# hdfs.rollCount:
按照event个数生成文件
#上述这个三个参数 谁先满足条件 谁触发文件滚动
#企业中,最常见的是关闭 时间间隔 和event个数 只以文件大小来控制
hdfs.rollInterval=0
hdfs.rollSize=134217728
hdfs.rollCount=0
上述配置表示hdfs生成的文件达到128M 滚动切换
如何实现分区存储
添加动态时间标记目录
#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
#可以指定时间 里面的文件类似hive分区表
a1.sinks.k1.hdfs.path = hdfs://node1.itcast.cn:8020/flume/test3/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp = true
使用hive msck 和 add partition 声明分区
网友评论