背景
在前面一章中已经大概介绍了对分布式系统监控的平台设计。这一章节主要是详细说明如何实现日志的采集功能。
设计思路上一章节也介绍了,大致就是:
1.shell脚本会将系统信息写入到指定的日志文件中。
2.通过操作系统的计划任务周期性的调用shell脚本。
3.再由flume-agent来将日志文件的增量信息通过消息的方法发送到日志归集端。
下面具体介绍每个步骤的实现。
shell脚本
创建一个shell脚本文件resource-info.sh,修改文件为可执行文件
chmod 777 resource-info.sh
下面是一个关于操作系统资源信息统计的脚本:
#!/bin/bash
#这个脚本使用来统计CPU、磁盘、内存使用率、带宽的
total=0
system=0
user=0
i=0
echo_str=""
#带宽使用情况
logpath="/home/bea1/logs/resource-statics/test.log"
time=`date "+%Y-%m-%d %k:%M"`
day=`date "+%Y-%m-%d"`
minute=`date "+%k:%M"`
echo_str=${echo_str}"|date_stastic:$day $minute"
#循环五次,避免看到的是偶然的数据
#echo "#带宽的使用情况:#"
while (( $i<5 ))
do
#原先的`ifconfig eth0|sed -n "7p"|awk '{print $2}'|cut -c7-`方式获取网卡的信息为空,已经注释掉
#rx_before=`ifconfig eth0|sed -n "7p"|awk '{print $2}'|cut -c7-`
#tx_before=`ifconfig eth0|sed -n "7p"|awk '{print $6}'|cut -c7-`
rx_before=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $2}')
tx_before=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $10}')
sleep 2
#rx_after=`ifconfig eth0|sed -n "7p"|awk '{print $2}'|cut -c7-`
#tx_after=`ifconfig eth0|sed -n "7p"|awk '{print $6}'|cut -c7-`
rx_after=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $2}')
tx_after=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $10}')
rx_result=$[(rx_after-rx_before)/1024/1024/2*8]
tx_result=$[(tx_after-tx_before)/1024/1024/2*8]
#echo "$time Now_In_Speed: $rx_result Mbps Now_OUt_Speed: $tx_result Mbps"
let "i++"
done
rx_result=$(cat $logpath|grep "$time"|awk '{In+=$4}END{print In}')
if [ -z "$rx_result" ]; then
rx_result=1
fi
tx_result=$(cat $logpath|grep "$time"|awk '{Out+=$7}END{print Out}')
if [ -z "$tx_result" ]; then
tx_result=1
fi
In_Speed=$(echo "scale=2;$rx_result/5"|bc)
Out_Speed=$(echo "scale=2;$tx_result/5"|bc)
#echo "#带宽的5次的平均值是:#"
echo_str=${echo_str}"|network_average_in_speed(Mbps):$In_Speed|network_average_out_speed(Mbps):$Out_Speed"
#CPU使用情况
which sar > /dev/null 2>&1
if [ $? -ne 0 ]
then
total=`vmstat 1 5|awk '{x+=$13;y+=$14}END{print x+y}'`
average=$(echo "scale=2;$total/5"|bc)
fi
#echo "#CPU使用率:#"
#echo "Total CPU is already use: $average%"
echo_str=${echo_str}"|cpu_use_rate(%):$average"
#磁盘使用情况(注意:需要用sed先进行格式化才能进行累加处理)
disk_used=$(df -m | sed '1d;/ /!N;s/\n//;s/ \+/ /;' | awk '{used+=$3} END{print used}')
disk_totalSpace=$(df -m | sed '1d;/ /!N;s/\n//;s/ \+/ /;' | awk '{totalSpace+=$2} END{print totalSpace}')
disk_all=$(echo "scale=4;$disk_used/$disk_totalSpace" | bc)
disk_percent1=$(echo $disk_all | cut -c 2-3)
disk_percent2=$(echo $disk_all | cut -c 4-5)
disk_warning=`df -m | sed '1d;/ /!N;s/\n//;s/ \+/ /;' | awk '{if ($5>85) print $5 $6;} '`
#echo "#磁盘利用率#"
#echo "hard disk has used: $disk_percent1.$disk_percent2%"
#echo -e "\t\t#磁盘存在目录使用率超过85%报警#"
#echo -e "\t\tover used: $disk_warning"
echo_str=${echo_str}"|disk_use_rate(%):$disk_percent1.$disk_percent2"
echo_str=${echo_str}"|disk_over_uesd(85%up):$disk_warning"
#内存使用情况
memory_used=$(free -m | awk 'NR==2' | awk '{print $3}')
buffer_used=$(free -m | awk 'NR==2' | awk '{print $6}')
cache_used=$(free -m | awk 'NR==2' | awk '{print $7}')
free=$(free -m | awk 'NR==2' | awk '{printf $4}')
memory_all=$(free -m | awk 'NR==2' | awk '{print $2}')
used_all=$[memory_all-(free+buffer_used+cache_used)]
#echo "$used_all $memory_all $free" >>$logpath
echo_str=${echo_str}"|memory_total(MB):$memory_all|memory_used(MB):$used_all|memory_free(MB):$free"
memory_percent=$(echo "scale=4;$memory_used / $memory_all" | bc)
memory_percent2=$(echo "scale=4; $used_all / $memory_all" | bc)
percent_part1=$(echo $memory_percent | cut -c 2-3)
percent_part2=$(echo $memory_percent | cut -c 4-5)
percent_part11=$(echo $memory_percent2 | cut -c 2-3)
percent_part22=$(echo $memory_percent2 | cut -c 4-5)
#echo "#内存使用率#"
#echo "system memory is already use: $percent_part1.$percent_part2%"
#echo "actual memory is already use: $percent_part11.$percent_part22%"
echo_str=${echo_str}"|memory_system_use_rate(%):$percent_part1.$percent_part2|memory_actual_use_rate(%):$percent_part11.$percent_part22"
#echo "结束本次统计:$day $minute"
#echo "*************************************************************************"
#echo "$echo_str"
echo "$echo_str" >> $logpath
执行shell脚本进行测试
sh -x ./resource-info.sh
执行成功将会在指定的文件中生成如下的日志信息:
|date_stastic:2018-08-17 10:57|network_average_in_speed(Mbps):.20|network_average_out_speed(Mbps):.20|cpu_use_rate(%):|disk_use_rate(%):14.18|disk_over_uesd(85%up):|memory_total(MB):7982|memory_used(MB):4230|memory_free(MB):175|memory_system_use_rate(%):97.80|memory_actual_use_rate(%):52.99
编写计划任务
这里为linux系统为例,使用cron来实现该功能。linux 系统则是由 cron (crond) 这个系统服务来控制的。Linux 系统上面原本就有非常多的计划性工作,因此这个系统服务是默认启动的。另 外, 由于使用者自己也可以设置计划任务,所以, Linux 系统也提供了使用者控制计划任务的命令 :crontab 命令。
cron的安装这里就忽略不讲述了,下面以cron已经安装为前提介绍一下如何配置。
执行以下指令,编辑计划任务
crontab -e
在编辑器中编写如下信息
0,15,30,45 * * * * sh /home/bea1/shell/crontab/resource-statics.sh
就是每15分钟执行一下脚本,编写完成以后保存退出就可以了。
日志采集
日志采集是使用flume来实现的,下面大概介绍一下flume以及具体描述flume的安装配置与配置。
Flume简介
Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制。
flume-tech.png
Flume的一些核心概念:
Agent:使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
Client:生产数据,运行在一个独立的线程。
Source:从Client收集数据,传递给Channel。
Sink 从Channel收集数据,运行在一个独立线程。
Channel:连接 sources 和 sinks ,这个有点像一个队列。
Events:可以是日志记录、 avro 对象等。
Flume运行流程:
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个Java进程,运行在日志收集节点。
三大组件介绍:
*Source
从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等
*Channel
channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel , File System channel , Memort channel等.
*Sink
sink将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase.
flume安装
通过官方地址下载flume安装包[http://mirror.bit.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz]
将安装包解压并移动到指定的目录下:
tar -zxvf apache-flume-1.6.0-bin.tar.gz
mv apache-flume-1.6.0-bin/ /usr/bea1/shell/apache-flume-1.6.0-bin
由于默认情况下flume是没有提供对rocketmq的source及sink实现的。所以需要对其source和sink实现并将实现对应的第三方依赖包放在flume中。幸运的是rocketmq项目组已经对flume的source和sink实现了,只需要我们编译一下,将包引入到flume中就可以了。
首先下将工程源码:[https://gitee.com/mirrors/RocketMQ-Externals/tree/master/rocketmq-flume]
下将以后解压工程,并编译打包,由于工程是java的maven工程,所以需要安装jdk及maven。编译完成以后就
子工程RocketMQ-Externals/tree/master/rocketmq-flume生成的包及依赖包复制到flume的lib子目录中。
rocketmq-flume-souce-0.0.2-SNAPSHOT.jar
rocketmq-flume-sink-0.0.2-SNAPSHOT.jar
netty-all-4.0.36.Final.jar
rocketmq-common-4.0.0-incubating.jar
rocketmq-remoting-4.0.0-incubating.jar
fastjson-1.2.12.jar
rocketmq-client-4.0.0-incubating.jar
配置flume
在flume的conf子目录下创建一个配置文件flume-conf-resource.properties并进行修改
resource.sources = r1
resource.sinks = k1
resource.channels = c1
# Describe/configure the source
resource.sources.r1.type = exec
resource.sources.r1.command = tail -F /home/bea1/logs/resource-statics/resources-info.log
# Describe the sink
resource.sinks.k1.type = org.apache.rocketmq.flume.ng.sink.RocketMQSink
resource.sinks.k1.nameserver=10.209.8.126:9876
resource.sinks.k1.topic = RESOURCES_INFO_TOPIC
resource.sinks.k1.tag = 10.10.108.204
resource.sinks.k1.producerGroup = RESOURCES_INFO_PRODUCER_GROUP
# Use a channel which buffers events in memory
resource.channels.c1.type = memory
resource.channels.c1.capacity = 100
resource.channels.c1.transactionCapacity = 100
resource.channels.c1.keep-alive=3
# Bind the source and sink to the channel
resource.sources.r1.channels = c1
resource.sinks.k1.channel = c1
上面已经对flume的三大核组件source, channel,sink进行了介绍。在这个示例中:
我使用的source是通过执行tail -F 指令来获取日志的增量信息。
channel是使用的内存缓存
sink是rocketmq,该类型的具体参数配置说明如下:
key | nullable | default | description |
---|---|---|---|
nameserver | false | nameserver address | |
topic | true | "FLUME_TOPIC" | topic name |
tag | true | "FLUME_TAG" | tag name |
producerGroup | true | "FLUME_PRODUCER_GROUP" | producerGroup name |
batchSize | true | 1 | max batch event taking num |
maxProcessTime | true | 1000 | max batch event taking time,default is 1s |
运行flume
通过执行flume子目录bin下面的flume-ng指令来启动flume-agent
nohup /home/bea1/shell/apache-flume-1.8.0-bin/bin/flume-ng agent -c conf -f conf/flume-conf-process.properties -n process -Dflume.root.logger=INFO,console >/home/bea1/logs/statics/flume-process-output.file 2>&1 &
flume运行的情况可以通过日志文件/home/bea1/logs/statics/flume-process-output.file进行查看。
到这里日志采集的功能就实现完了。
网友评论