美文网首页工具监控工具分布式&高可用
分布式系统监控(三)- 日志采集

分布式系统监控(三)- 日志采集

作者: do_young | 来源:发表于2018-08-23 10:31 被阅读223次

    背景

    在前面一章中已经大概介绍了对分布式系统监控的平台设计。这一章节主要是详细说明如何实现日志的采集功能。
    设计思路上一章节也介绍了,大致就是:
    1.shell脚本会将系统信息写入到指定的日志文件中。
    2.通过操作系统的计划任务周期性的调用shell脚本。
    3.再由flume-agent来将日志文件的增量信息通过消息的方法发送到日志归集端。
    下面具体介绍每个步骤的实现。

    shell脚本

    创建一个shell脚本文件resource-info.sh,修改文件为可执行文件

    chmod 777 resource-info.sh
    

    下面是一个关于操作系统资源信息统计的脚本:

    #!/bin/bash
    #这个脚本使用来统计CPU、磁盘、内存使用率、带宽的
    total=0
    system=0
    user=0
    i=0
    echo_str=""
    #带宽使用情况
    logpath="/home/bea1/logs/resource-statics/test.log"
    time=`date "+%Y-%m-%d %k:%M"`
    day=`date "+%Y-%m-%d"`
    minute=`date "+%k:%M"`
    
    echo_str=${echo_str}"|date_stastic:$day $minute"    
    
    #循环五次,避免看到的是偶然的数据
    #echo "#带宽的使用情况:#"  
    while (( $i<5 ))
    do
    #原先的`ifconfig eth0|sed -n "7p"|awk '{print $2}'|cut -c7-`方式获取网卡的信息为空,已经注释掉
    #rx_before=`ifconfig eth0|sed -n "7p"|awk '{print $2}'|cut -c7-`
    #tx_before=`ifconfig eth0|sed -n "7p"|awk '{print $6}'|cut -c7-`
    rx_before=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $2}')
    tx_before=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $10}')
    sleep 2
    #rx_after=`ifconfig eth0|sed -n "7p"|awk '{print $2}'|cut -c7-`
    #tx_after=`ifconfig eth0|sed -n "7p"|awk '{print $6}'|cut -c7-`
    rx_after=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $2}')
    tx_after=$(cat /proc/net/dev | grep 'eth' | tr : " " | awk '{print $10}')
    
    rx_result=$[(rx_after-rx_before)/1024/1024/2*8]
    
    tx_result=$[(tx_after-tx_before)/1024/1024/2*8]
    
    #echo  "$time Now_In_Speed: $rx_result Mbps Now_OUt_Speed: $tx_result Mbps"     
    
    let "i++"
    done
    
    rx_result=$(cat $logpath|grep "$time"|awk '{In+=$4}END{print In}')
    if [ -z "$rx_result" ]; then 
        rx_result=1
    fi
    tx_result=$(cat $logpath|grep "$time"|awk '{Out+=$7}END{print Out}')
    if [ -z "$tx_result" ]; then 
        tx_result=1
    fi
    In_Speed=$(echo "scale=2;$rx_result/5"|bc)
    Out_Speed=$(echo "scale=2;$tx_result/5"|bc)
    #echo "#带宽的5次的平均值是:#"   
    echo_str=${echo_str}"|network_average_in_speed(Mbps):$In_Speed|network_average_out_speed(Mbps):$Out_Speed"  
    
    #CPU使用情况
    which sar > /dev/null 2>&1
    if [ $? -ne 0 ]
    then
      total=`vmstat 1 5|awk '{x+=$13;y+=$14}END{print x+y}'`
      average=$(echo "scale=2;$total/5"|bc)
    fi
    
    #echo "#CPU使用率:#"   
    #echo "Total CPU  is already use: $average%"    
    echo_str=${echo_str}"|cpu_use_rate(%):$average"
    #磁盘使用情况(注意:需要用sed先进行格式化才能进行累加处理)
    disk_used=$(df -m | sed '1d;/ /!N;s/\n//;s/ \+/ /;' | awk '{used+=$3} END{print used}')
    disk_totalSpace=$(df -m | sed '1d;/ /!N;s/\n//;s/ \+/ /;' | awk '{totalSpace+=$2} END{print totalSpace}')
    disk_all=$(echo "scale=4;$disk_used/$disk_totalSpace" | bc)
    disk_percent1=$(echo $disk_all | cut -c 2-3)
    disk_percent2=$(echo $disk_all | cut -c 4-5)
    disk_warning=`df -m | sed '1d;/ /!N;s/\n//;s/ \+/ /;' | awk '{if ($5>85) print $5 $6;} '`
    #echo "#磁盘利用率#"     
    #echo "hard disk has used: $disk_percent1.$disk_percent2%"  
    #echo -e "\t\t#磁盘存在目录使用率超过85%报警#"   
    #echo -e "\t\tover used: $disk_warning"     
    echo_str=${echo_str}"|disk_use_rate(%):$disk_percent1.$disk_percent2"
    echo_str=${echo_str}"|disk_over_uesd(85%up):$disk_warning"
    
    
    #内存使用情况
    memory_used=$(free -m | awk 'NR==2' | awk '{print $3}')
    buffer_used=$(free -m | awk 'NR==2' | awk '{print $6}')
    cache_used=$(free -m | awk 'NR==2' | awk '{print $7}')
    free=$(free -m | awk 'NR==2' | awk '{printf $4}')
    memory_all=$(free -m | awk 'NR==2' | awk '{print $2}')
    used_all=$[memory_all-(free+buffer_used+cache_used)]
    #echo "$used_all $memory_all $free" >>$logpath
    echo_str=${echo_str}"|memory_total(MB):$memory_all|memory_used(MB):$used_all|memory_free(MB):$free"
    memory_percent=$(echo "scale=4;$memory_used / $memory_all" | bc)
    memory_percent2=$(echo "scale=4; $used_all / $memory_all" | bc)
    percent_part1=$(echo $memory_percent | cut -c 2-3)
    percent_part2=$(echo $memory_percent | cut -c 4-5) 
    percent_part11=$(echo $memory_percent2 | cut -c 2-3)
    percent_part22=$(echo $memory_percent2 | cut -c 4-5)
    #echo "#内存使用率#"     
    #echo "system memory is already use: $percent_part1.$percent_part2%"    
    #echo "actual memory is already use: $percent_part11.$percent_part22%"  
    echo_str=${echo_str}"|memory_system_use_rate(%):$percent_part1.$percent_part2|memory_actual_use_rate(%):$percent_part11.$percent_part22"
    
    #echo  "结束本次统计:$day $minute"    
    #echo  "*************************************************************************"  
    #echo "$echo_str"
    echo "$echo_str" >> $logpath
    

    执行shell脚本进行测试

    sh -x ./resource-info.sh
    

    执行成功将会在指定的文件中生成如下的日志信息:

    |date_stastic:2018-08-17 10:57|network_average_in_speed(Mbps):.20|network_average_out_speed(Mbps):.20|cpu_use_rate(%):|disk_use_rate(%):14.18|disk_over_uesd(85%up):|memory_total(MB):7982|memory_used(MB):4230|memory_free(MB):175|memory_system_use_rate(%):97.80|memory_actual_use_rate(%):52.99
    

    编写计划任务

    这里为linux系统为例,使用cron来实现该功能。linux 系统则是由 cron (crond) 这个系统服务来控制的。Linux 系统上面原本就有非常多的计划性工作,因此这个系统服务是默认启动的。另 外, 由于使用者自己也可以设置计划任务,所以, Linux 系统也提供了使用者控制计划任务的命令 :crontab 命令。
    cron的安装这里就忽略不讲述了,下面以cron已经安装为前提介绍一下如何配置。
    执行以下指令,编辑计划任务

    crontab -e
    

    在编辑器中编写如下信息

    0,15,30,45 * * * * sh /home/bea1/shell/crontab/resource-statics.sh
    

    就是每15分钟执行一下脚本,编写完成以后保存退出就可以了。

    日志采集

    日志采集是使用flume来实现的,下面大概介绍一下flume以及具体描述flume的安装配置与配置。

    Flume简介

    Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制。


    flume-tech.png

    Flume的一些核心概念:

    Agent:使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
    Client:生产数据,运行在一个独立的线程。
    Source:从Client收集数据,传递给Channel。
    Sink 从Channel收集数据,运行在一个独立线程。
    Channel:连接 sources 和 sinks ,这个有点像一个队列。
    Events:可以是日志记录、 avro 对象等。

    Flume运行流程:

    Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个Java进程,运行在日志收集节点。

    三大组件介绍:
    *Source
    从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等
    *Channel
    channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel , File System channel , Memort channel等.
    *Sink
    sink将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase.

    flume安装

    通过官方地址下载flume安装包[http://mirror.bit.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz]
    将安装包解压并移动到指定的目录下:

    tar -zxvf apache-flume-1.6.0-bin.tar.gz
    mv apache-flume-1.6.0-bin/ /usr/bea1/shell/apache-flume-1.6.0-bin
    

    由于默认情况下flume是没有提供对rocketmq的source及sink实现的。所以需要对其source和sink实现并将实现对应的第三方依赖包放在flume中。幸运的是rocketmq项目组已经对flume的source和sink实现了,只需要我们编译一下,将包引入到flume中就可以了。
    首先下将工程源码:[https://gitee.com/mirrors/RocketMQ-Externals/tree/master/rocketmq-flume]
    下将以后解压工程,并编译打包,由于工程是java的maven工程,所以需要安装jdk及maven。编译完成以后就
    子工程RocketMQ-Externals/tree/master/rocketmq-flume生成的包及依赖包复制到flume的lib子目录中。

    rocketmq-flume-souce-0.0.2-SNAPSHOT.jar
    rocketmq-flume-sink-0.0.2-SNAPSHOT.jar
    netty-all-4.0.36.Final.jar
    rocketmq-common-4.0.0-incubating.jar
    rocketmq-remoting-4.0.0-incubating.jar
    fastjson-1.2.12.jar
    rocketmq-client-4.0.0-incubating.jar
    

    配置flume

    在flume的conf子目录下创建一个配置文件flume-conf-resource.properties并进行修改

    resource.sources = r1  
    resource.sinks = k1  
    resource.channels = c1  
      
    # Describe/configure the source  
    resource.sources.r1.type = exec  
    resource.sources.r1.command = tail -F /home/bea1/logs/resource-statics/resources-info.log
    
    # Describe the sink  
    resource.sinks.k1.type = org.apache.rocketmq.flume.ng.sink.RocketMQSink
    resource.sinks.k1.nameserver=10.209.8.126:9876
    resource.sinks.k1.topic = RESOURCES_INFO_TOPIC
    resource.sinks.k1.tag = 10.10.108.204
    resource.sinks.k1.producerGroup = RESOURCES_INFO_PRODUCER_GROUP
      
    # Use a channel which buffers events in memory  
    resource.channels.c1.type = memory  
    resource.channels.c1.capacity = 100  
    resource.channels.c1.transactionCapacity = 100  
    resource.channels.c1.keep-alive=3
      
    # Bind the source and sink to the channel  
    resource.sources.r1.channels = c1  
    resource.sinks.k1.channel = c1  
    

    上面已经对flume的三大核组件source, channel,sink进行了介绍。在这个示例中:
    我使用的source是通过执行tail -F 指令来获取日志的增量信息。
    channel是使用的内存缓存
    sink是rocketmq,该类型的具体参数配置说明如下:

    key nullable default description
    nameserver false nameserver address
    topic true "FLUME_TOPIC" topic name
    tag true "FLUME_TAG" tag name
    producerGroup true "FLUME_PRODUCER_GROUP" producerGroup name
    batchSize true 1 max batch event taking num
    maxProcessTime true 1000 max batch event taking time,default is 1s

    运行flume

    通过执行flume子目录bin下面的flume-ng指令来启动flume-agent

     nohup /home/bea1/shell/apache-flume-1.8.0-bin/bin/flume-ng agent -c conf -f conf/flume-conf-process.properties -n process -Dflume.root.logger=INFO,console >/home/bea1/logs/statics/flume-process-output.file 2>&1 &
    

    flume运行的情况可以通过日志文件/home/bea1/logs/statics/flume-process-output.file进行查看。

    到这里日志采集的功能就实现完了。

    相关文章

      网友评论

        本文标题:分布式系统监控(三)- 日志采集

        本文链接:https://www.haomeiwen.com/subject/yccpiftx.html