美文网首页
马哥教育网站日志分析分步详解

马哥教育网站日志分析分步详解

作者: efreeway | 来源:发表于2021-06-22 10:50 被阅读0次

马哥教育的Wayne老师讲解的python课程非常注重实际。比如他讲解的一个日志分析的案例,就是从运维实际出发,把日志分析的各个环节的原理讲的非常透彻。虽然现在日志分析有很多功能强大的专门软件,但是通过这个案例了解了日志处理的原理,对于工作也有很多的益处。(Wayne老师的课程链接 https://ke.qq.com/course/134017?taid=537476502522753

Wayne老师的日志分析,每次讲课根据授课场景会有些变化。不过基本的组成部分大致是下面几步:

数据源的文本正则分析、数据类型转换、生产器方式产生数据、窗口函数、队列、线程和分发器、用Logging输出信息、通用的文件装载

尽管最终的程序不到100行,但是上面7个步骤包括了几个重要的模块的使用方法和一些编程技巧,熟练使用之后对于编程能力的提高很有好处。

(按照wayne老师的课堂提示:下面程序只是用于学习用途,还有很多不完善的地方)

下面一步步来,每一步的程序都是可以独立运行的,这样调试起来比较方便

1.数据源及正则分析

数据是网上查到的:故事大全的几天的日志数据

数据格式:

2017-02-26 00:00:15 222.187.225.152 GET m.gushidaquan.cc/news/yule/1266748_10.html - 80 - 123.125.71.100 Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html) - 200

2017-02-26 00:00:19 222.187.225.152 GET www.gushidaquan.cc/811/2016820207621.html - 80 - 67.229.136.58 Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1) http://www.baidu.com 404

要对数据进行正则分析,效率最高的办法是使用辅助工具来写正则表达式字符串。Wayne老师推荐的是regester(https://deerchao.cn/tools/regester/index.htm),是国人开发的全免费软件。可以离线使用,很方便。

把源文本黏贴到左下方的“源文本”区,上方的“模式”区里面输入正则表达式字符串,然后点击“运行”按钮,在右侧的“结果”区域里面会出现分析得到的信息,非常好用。要注意的一点,就是regester用的是标准正则表达式,要在python程序中使用,要注意正则分组名称前面要加一个大写的P。比如(?<srcip>[\d.]{7,})要改成(?P<srcip>[\d.]{7,})。

对于同一条字符串,同一个分析要求,可以有多种多样的正则表达式字符串,没有什么标准答案,只要能用就行。处理过程中,碰到有问题的字符串还可以记录下来,以便后续优化。

(先下载数据源,从里面任意log文件中截取100条左右的数据,作为测试数据,保存到程序文件目录下,命名为“sample.log”,如果碰到字符集不是utf-8的情况,就用notepad等工具看一下文件的编码格式,在程序中用相应的编码格式打开文件):

代码

import re

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

with open("sample.log") as f:

    for line in f:

        data = regex.match(line).groupdict()

        print(data)

2.数据类型转换

数据类型转换的思路非常有用,适用面非常广。这里的关键式掌握lambda表达式、datetime日期字符串的格式转换。

这里的主要目的,是把日期字符串转换为datatime模块的格式,以便后续时间处理时适用datetime模块的函数。第2个lambda函数的含义是从ops字典中,如果找不到相关的字段对应的转换操作函数,就不进行任何转换,保持原值。

代码:

import re

import datetime

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"),  "status":int }

with open("sample.log") as f:

    for line in f:

        data = {k: ops.get(k, lambda x: x)(v) for k,v in regex.match(line).groupdict().items()}

        print(data)

3.生产器方式产生数据

日志数据通常量比较大,如果不采用惰性求值(生成器),对服务器的内存和CPU都会造成较大的压力,还有可能把服务器给搞瘫掉,比较危险。生成器的应用范围非常广,而且也很简单,是一个必须掌握的基本功。

import re

import datetime

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:    # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

if __name__ == "__main__":

    src = extract("sample.log")

    for data in src:

        print(data)

4.窗口函数

在分析日志的时候,一般是隔一段时间输出前一个统计窗口的统计信息。比如,每隔5秒钟输出前面1分钟内网站的访问量,网站访问的状态统计等。一般间隔(interval,本例是5秒)和窗口的宽带(width,本例是1分钟,即60秒)。这种情况下,每次只需要对一个窗口宽度(width)的数据进行处理,处理之后,要把最老的interval的数据扔掉,补充新的interval的数据,进行下一次统计。

这个过程在“BeautifulSoulpy的时间窗口函数实现(https://www.jianshu.com/p/c9e8d3d9a33f)”讲的比较清楚。

代码:

import re

import datetime

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:    # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds= width - interval)

    buf = []

    while True:

        try:

            data = next(src)

        except StopIteration:

            break

        if data:

            current = data['datetime']

            buf.append(data)

        if (current-start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime']>=(current-delta)]

def dataprint(buffer):  # handler that just print data

    print("*" * 80)

    for data in buffer:

        print(data)

def datetimeprint(buffer):

    print("*" * 80)

    for data in buffer:

        print(data['datetime'])

def statusinfo(buffer):

    print("*" * 80)

    status =[x['status'] for x in buffer]

    print( {x: status.count(x)/len(status) for x in set(status)})

if __name__ == "__main__":

    src = extract("sample.log")

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    windows(src, statusinfo, 10, 5)

5.队列、线程和分发器

日志分析中,会有多种需求,各种需求的width、interval和具体输出的内容不同。需要多个具体的处理函数,主程序要负责调度。这个就是分发器的由来。这里,分发器要做的是对每个统计需求都新产生一个线程,并把提取处理的日志信息发送到该线程对应的数据队列。分发器本身就是一个独立的讲课内容,wayne老师的课程中有专门的一个案例。

代码:

import re

import datetime

from queue import Queue

from threading import Thread

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:  # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds=width - interval)

    buf = []

    while True:

        # try:

        #    data = next(src)

        # except StopIteration:

        #    break

        data = src.get()

        if data:

            current = data['datetime']

            buf.append(data)

        if (current - start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime'] >= (current - delta)]

def dataprint(buffer):  # handler that just print data

    print("*" * 80)

    for data in buffer:

        print(data)

def datetimeprint(buffer):

    print("*" * 80)

    for data in buffer:

        print(data['datetime'])

def statusinfo(buffer):

    print("*" * 80)

    status = [x['status'] for x in buffer]

    print({x: status.count(x) / len(status) for x in set(status)})

def dispatcher(src):

    qs = []

    handlers = []

    def reg(handler, width, interval):

        q = Queue()

        qs.append(q)

        t = Thread(target=windows, args=(q, handler, width, interval), name=str(handler))

        handlers.append(t)

    def run():

        for t in handlers:

            t.start()

        for data in src:

            for q in qs:

                q.put(data)

    return reg, run

if __name__ == "__main__":

    src = extract("20170224.log")

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    # windows(src, statusinfo, 10, 5)

    reg, run = dispatcher(src)

    reg(statusinfo, 10, 5)

    reg(datetimeprint, 10, 5)

    run()

6.用Logging输出信息

日志分析有多种展示形式,可以图形化、可以把存数据库,当然最简单的是直接打印。由于print语句不是线程安全的,因此如果适用print预计,各个线程输出的信息可能会混在一行中,造成数据混乱。模块logging提供了线程安全的输出方式。只要是牵涉到多线程、多进程,都会用到logging模块,也是必须掌握的基本功。

代码:

import re

import datetime

from queue import Queue

from threading import Thread

import logging

logformat="%(threadName)s:  %(message)s"

logging.basicConfig(format=logformat, level=logging.INFO)

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:  # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds=width - interval)

    buf = []

    while True:

        # try:

        #    data = next(src)

        # except StopIteration:

        #    break

        data = src.get()

        if data:

            current = data['datetime']

            buf.append(data)

        if (current - start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime'] >= (current - delta)]

def dataprint(buffer):  # handler that just print data

    # print("*" * 80)

    for data in buffer:

        logging.info("{}".format(str(data)))

def datetimeprint(buffer):

    for data in buffer:

        logging.info("{}".format(data['datetime']))

def statusinfo(buffer):

    status = [x['status'] for x in buffer]

    # print({x: status.count(x) / len(status) for x in set(status)})

    logging.info("{}".format({x: status.count(x) / len(status) for x in set(status)}))

def dispatcher(src):

    qs = []

    handlers = []

    def reg(handler, width, interval, name):

        q = Queue()

        qs.append(q)

        t = Thread(target=windows, args=(q, handler, width, interval),name=name)

        handlers.append(t)

    def run():

        for t in handlers:

            t.start()

        for data in src:

            for q in qs:

                q.put(data)

    return reg, run

if __name__ == "__main__":

    src = extract("20170224.log")

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    # windows(src, statusinfo, 10, 5)

    reg, run = dispatcher(src)

    reg(statusinfo, 10, 5, 'statusThread')

    reg(datetimeprint, 10, 5, 'dateprintThread')

    run()

7.通用的文件装载

日志文件可能是一个日志目录下面还有子目录,子目录中有日志文件,也有其他文件,如果只选择特定目录下面特定文件名后缀的日志文件来处理,也有比较通用的办法,这个在“采蘑菇的下午茶”有详细的介绍。

代码:

import re

import datetime

from queue import Queue

from threading import Thread

import logging

from pathlib import Path

logformat="%(threadName)s:  %(message)s"

logging.basicConfig(format=logformat, level=logging.INFO)

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def loadfile(filename, encoding):

    # print(filename)

    with open(filename, encoding=encoding) as f:

        for line in f:

            data = regex.match(line)

            if data:  # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds=width - interval)

    buf = []

    while True:

        # try:

        #    data = next(src)

        # except StopIteration:

        #    break

        data = src.get()

        if data:

            current = data['datetime']

            buf.append(data)

        if (current - start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime'] >= (current - delta)]

def dataprint(buffer):  # handler that just print data

    # print("*" * 80)

    for data in buffer:

        logging.info("{}".format(str(data)))

def datetimeprint(buffer):

    for data in buffer:

        logging.info("{}".format(data['datetime']))

def statusinfo(buffer):

    status = [x['status'] for x in buffer]

    # print({x: status.count(x) / len(status) for x in set(status)})

    logging.info("{}".format({x: status.count(x) / len(status) for x in set(status)}))

def dispatcher(src):

    qs = []

    handlers = []

    def reg(handler, width, interval, name):

        q = Queue()

        qs.append(q)

        t = Thread(target=windows, args=(q, handler, width, interval),name=name)

        handlers.append(t)

    def run():

        for t in handlers:

            t.start()

        for data in src:

            for q in qs:

                q.put(data)

    return reg, run

def load(*args, ext="*.log",recursive=False, encoding='utf-8'):

    for pf in args:

        print(pf)

        p = Path(pf)

        if p.exists():

            if p.is_dir():

                if type:

                    if isinstance(ext,str):

                        ext = [ext]

                    else:

                        ext = list(ext)

                    for s in ext:

                        fl = p.rglob(s) if recursive else p.glob(s)

                        for f in fl:

                            yield from loadfile(str(f.absolute()), encoding=encoding)

            elif p.is_file():

                yield from loadfile(str(p.absolute()), encoding=encoding)

if __name__ == "__main__":

    src = load("sample.log")

    # src = load('.')

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    # windows(src, statusinfo, 10, 5)

    reg, run = dispatcher(src)

    reg(statusinfo, 10, 5, 'statusThread')

    reg(datetimeprint, 10, 5, 'dateprintThread')

    run()

说明

Wayne老师在教学视频里面多次强调敲代码的重要性。一套代码,看看好像懂了,把讲义合上,有很多的语句可能有敲不出来了。要真正理解一个概念,最简单的标准就是:不看讲义,自己能够从头到尾把代码敲出来并且能够编译通过,这样对于相关的知识才算是有了真正的理解。

现在网络上的python教学课程非常多。学习的关键是找一套合适自己的资料,腾讯课堂里面的马哥教育针对不同的学习方向有不同的python教程,好像都是Wayne老师在讲,根据不同的要求对教程进行了取舍,很棒。

资料来源:

1. 马哥教育wayne老师教学视频:Wayne老师的课程链接https://ke.qq.com/course/134017?taid=537476502522753

2. “采蘑菇的下午茶”的《Python学习之 ---日志分析+数据分发与分析+多线程+queue模块+日志分析综合》:https://blog.csdn.net/qq_40498551/article/details/90181774

3. BeautifulSoulpy的时间窗口函数实现:https://www.jianshu.com/p/c9e8d3d9a33f

相关文章

  • 马哥教育网站日志分析分步详解

    马哥教育的Wayne老师讲解的python课程非常注重实际。比如他讲解的一个日志分析的案例,就是从运维实际出发,把...

  • 2019-01-03 ELK日志分析系统

    马哥教育号称:10 分钟快速搭建 ELK 日志分析系统 北京马哥教育发表于北京马哥教育订阅 1.4K 在这篇文章中...

  • Java阻塞队列SynchronousQueue详解

    作者: 一字马胡 转载标志 【2017-11-03】 更新日志 导入 在文章Java阻塞队列详解中分析了java...

  • 快速读懂网站日志的秘诀(新手必看)

    网站日志分析工作对老站长来说是非常容易的,从日志代码的查看到日志分析出的问题解决是了如指掌。其实网站日志分析不难,...

  • Oracle日志分析

    参考文献 UTL_FILE_DIR Oracle日志分析! 详解Oracle的日志的工具——LogMiner 一、...

  • Logparser的用法

    下载安装 Logparser是一款非常强大的日志分析软件,可以帮助你详细的分析网站日志。是所有数据分析和网站优化人...

  • Rsyslog 详解

    Rsyslog 详解 日志整理 对日志进行分析,首先第一步要规整日志。 /etc/rsyslog.conf 是rs...

  • linux系统centOS6.5使用goaccess工具分析ng

    网站的log日志分析是每个站长经常做的必备工作,通过网站日志文件我们可以分析各大搜索引擎对网站的爬取情况。最近我的...

  • windows下合并文件

    需求 网站管理员抛来网站的原始日志,叫我分析下,有没有啥异常 分析 一开始我是按照每天的日志来分析,但是感觉没有汇...

  • Java调度线程池ScheduleExecutorService

    作者: 一字马胡 转载标志 【2017-11-03】 更新日志 链接 Java线程池详解(一)Java线程池详解...

网友评论

      本文标题:马哥教育网站日志分析分步详解

      本文链接:https://www.haomeiwen.com/subject/rlppyltx.html