集群规划 | 服务器hadoop102 | 服务器hadoop103 | 服务器hadoop104 |
---|---|---|---|
Flume(消费Kafka) | Flume |
1 项目经验之Flume组件选型
1)FileChannel和MemoryChannel区别
MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
选型:
金融类公司、对钱要求非常准确的公司通常会选择FileChannel。
传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。
2)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据。
3)Sink:HDFS Sink
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount。
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
2.Flume拦截器
由于flume默认会用linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费kafka里面的数据时,有可能已经是第二天了,那么这部分数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。
代码:https://github.com/Yobhel121/edu-flume-interceptor
1)需要先将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下面。
[yobhel@hadoop101 lib]$ ls | grep edu-flume-interceptor
edu-flume-interceptor-1.0-SNAPSHOT.jar
2)分发Flume到hadoop102、hadoop103
[yobhel@hadoop101 module]$ xsync flume/
3 日志消费Flume配置
1)Flume配置分析
image.png
2)Flume的具体配置如下:
(1)在hadoop103的/opt/module/flume/job目录下创建kafka-flume-hdfs.conf文件
[yobhel@hadoop103 job]$ vim kafka-flume-hdfs.conf
在文件配置如下内容。
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.TimestampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/data/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/log/edu_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
(2)在hadoop104的/opt目录下创建 data 目录,用于存放 FileChannel 相关文件
[yobhel@hadoop104 ~]$ cd /opt/
[yobhel@hadoop104 opt]$ sudo mkdir data
[yobhel@hadoop104 opt]$ sudo chown yobhel:yobhel data
[yobhel@hadoop104 opt]$ ll
总用量 0
drwxr-xr-x. 2 yobhel yobhel 6 3月 6 19:32 data
drwxr-xr-x. 7 yobhel yobhel 95 3月 6 19:11 module
drwxr-xr-x. 2 yobhel yobhel 6 3月 6 16:51 software
4 日志消费Flume启动停止脚本
1)在/home/yobhel/bin目录下创建脚本f2.sh
[yobhel@hadoop101 bin]$ vim f2.sh
在脚本中填写如下内容。
#! /bin/bash
case $1 in
"start"){
for i in hadoop103
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop103
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
done
};;
esac
2)增加脚本执行权限
[yobhel@hadoop101 bin]$ chmod +x f2.sh
3)f2 通道启动脚本
[yobhel@hadoop101 module]$ f2.sh start
4)f2 通道停止脚本
[yobhel@hadoop101 module]$ f2.sh stop
4.6.5 项目经验之Flume内存优化
1)问题描述:如果启动消费Flume抛出如下异常
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解决方案步骤:
(1)在hadoop101服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms2000m -Xmx2000m -Dcom.sun.management.jmxremote"
(2)同步配置到hadoop102、hadoop103服务器
[yobhel@hadoop101 conf]$ xsync flume-env.sh
3)Flume内存参数设置及优化
JVM heap一般设置为4G或更高。
-Xms与-Xmx最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。
网友评论