美文网首页
日志采集Agent

日志采集Agent

作者: john不哭 | 来源:发表于2018-11-24 11:43 被阅读161次

    前言

    日志采集这一领域,好像没多少技术含量。但是如果往下挖技术细节,可以涉及到Linux文件系统的工作原理(文件回收规则);进程管理(如何保证进程不会被杀死,杀死后如何恢复等问题);如何保证日志不漏,然后到日志不重复;从轮询收集,到inotify事件收集;如何处理日志接收方背压的问题等等。

    业界方案

    采集agent的方案有以下两种代表:

    • 以Flume为代表
    • 以Elastic公司为代表的logstash ,beat系列。其中本文重点研究了并且参考Filebeat。

    参考资料

    https://yq.aliyun.com/articles/204554?spm=5176.10695662.1996646101.searchclickresult.6ed6ff98OMv6De
    http://www.man7.org/linux/man-pages/man7/inotify.7.html
    https://www.elastic.co/guide/en/beats/filebeat/current/index.html

    核心问题

    • 轮转时,文件引用次数为零(压缩完成后,发生服务器断电)。导致日志丢失 (通过硬连接hold住文件解决)
    • 如何判断为一条完整日志 (通过\N等)
    • 采集进程被杀如何恢复工作环境

    进阶问题:

    • 配置管理问题:如何下放配置,如何热更新配置等问题。
    • 资源限制问题,如何限制资源占用上限。
      完整代码地址:
      https://github.com/Whojohn/log_demo

    基本概念

    Demo

    Version0.1(模仿Tail 实现日志采集,拥有断点续传功能。)

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import time
    
    
    class Agent(object):
        def __init__(self):
            pass
    
        def time_count(fun):  # fun = world
            #  @functools.wraps(fun) 的作用就是保留原函数信息如__name__, __doc__, __module__
            @functools.wraps(fun)
            def wrapper(*args, **kwargs):
                """
                this is wrapper function
                :param args:
                :param kwargs:
                :return:
                """
                start_time = time.time()
                temp = fun(*args, **kwargs)  # world(a=1, b=2)
                end_time = time.time()
                print("%s函数运行时间为%s" % (fun.__name__, end_time - start_time))
                return temp
    
            return wrapper
    
        @time_count
        def collect(self, file_name):
            f = open(file_name, "r")
            length = 0
    
            #Recover from the lastest readed.
            try:
                loc = open("tail-seek", "r")
                pre_location = loc.readline()[:-1]
                if pre_location != "":
                    f.seek(int(pre_location))
                    length += int(pre_location)
                    loc.close()
            except:
                pass
            loc = open("tail-seek", "w", buffering=0)
    
            # Avoid the overload the agent cpu and disk.
            # __loop = 0
            while 1:
                temp = f.readline()
                print temp[:-1]
    
                # Avoid the overload the agent cpu and disk.
                # if __loop%1000 == 0:
                #   time.sleep(0.003)
    
                # Push the data to the server.
                if temp != "":
                     print temp
     
                length += len(temp)
                loc.seek(0)
                loc.write(str(length) + '\n')
                if temp == "":
                    f.close()
                    loc.close()
                    break
    
    if __name__ == "__main__":
        s = Agent()
        s.collect("./show")
    

    坑:

    • 参考:https://stackoverflow.com/questions/620367/how-to-jump-to-a-particular-line-in-a-huge-text-file
      windows中当前行字符总长度,不等于当前行文件偏移位置(linux暂时没发现这个问题)。
      解决方案:
      1 使用f.tell()确定当前文件偏移位。

    • 已知进程恢复,可能会导致agent重复上传(上传完成后,准备同步写保存文件偏移位时,同步写还未被写入文件。)。如果先同步写,再上传又会导致日志丢失。
      解决方案:
      1 可以通过双方模拟tcp的ack机制进行,第一版不打算实现这个功能。即通过服务端进行一个版本号(其实就是一个顺序号)进行处理。(完美方案,保证双方都能不重,不漏,但是有一定消耗。)
      2 Agent端逻辑不变,服务端自行对文件版本号进行对比,落后就丢弃。优雅一点服务端告知Agent已经收到版本号(行号),然后指定Agent按照最新版本号上传,类似方案1。

    • 记录文件偏移位,同步写的消耗问题。简单测试了一下,没有同步写大概能快10%左右的样子。
      优化方案:
      1 Agent同步写的逻辑改为1000行写一次offset地址。这样恢复的时候重复问题会变得严重,可以通过服务器丢弃落后的日志解决(也可以引入版本号解决)。

    2018/11/26更新

    Version1.0(引入网络部分)

    特性:

    • 硬连接保证日志收集完成才释放文件。
    • 引入网络部分,简单的server接收者,作为Demo。
      代码见https://github.com/Whojohn/log_demo(version1.0部分)

    问题:

    1. 依旧为单进程,但是已经写好各种基础模块。
    2. Server端作为展示,没有把数据落盘,数据落盘时,机器断电,可能存在丢数据的风险。(依旧是版本号解决问题)。
    3. 收集方式依旧为轮询方式。
    4. 没有背压感知功能(可以通过参考Mysql 刷脏的做法,通过引入版本号,对比发送版本号和确认版本号,当发现确认版本号落后发送版本号10%时,只接收,不发送,直至服务器版本号追赶至5%。)。

    服务端数据落盘设计

    方案:
    • 同一主题的日志放置在同一内存中(deque)。
    • 为了能够利用顺序写,尽可能压榨性能,不同主题的日志合并写入到同一文件中。

    如何写:
    1 轮询所有不同主题的内存,满足一定条件(一定条数,一定时间等多种条件下)写入。并且写入对应的文件映射关系(如:主题,1~100000条,开始\结束文件offset)。

    如何读:
    1 找到对应的文件映射表,找到对于的条目如31081在1~100000之间,读取offset,然后循环找到对应的条目。

    如何恢复:
    1 检查映射表。
    2 利用写入的offset等信息重新从Agent拉取数据。

    版本号初步设计

    方案一:
    • 机器唯一标识(ip)+log文件唯一标识+offset作为版本号, Server端需要重传时候,需要发对应的版本号以及特殊的重传标识到对应的Agent上即可。Agent提取出对应的offset,继续上传。
      坑:
    • offset可能数字可能会很大。

    2018/11/27更新

    Version1.1 (优化采集性能,网络)

    特性:

    • Server数据落盘雏形(没有实现版本号)。
    • Agent通过多条日志打包成一条TCP报文,Server端通过弃用rfile.readline(Python循环太慢,短报文时间完全浪费在循环上。)。大幅度提高Agent采集性能,Server数据落盘性能。

    性能如下:

    1. 环境:Aliyun轻量服务器,1Core2G,40G SSD。
    2. 基准性能:
      通过dd测出磁盘性能连续读写性能大概在130MB/s之间。
    3. 日志类型:
      1. 短日志。如(787897797987897)
      2. 正常日志。如Nginx日志。
        优化前:
        1 1亿条,880Mb短日志。大概需要50秒。
        2 一千万条,3.7G日志。大概需要120秒。

    优化记录:

    方案:

    1. 优化Socket buff大小。
      • 修改Tcp buff为32k。

    2.尝试引入压缩机制。

    1. 优化文件读取方式。
      • 每一次读取更多的字节数,减低日志收集细粒度,以8k的细粒度进行读取文件。Python函数具体实现中readlines(size),指定缓存最小就是8k,他会接近于8k以保证数据完整性。(读的粒度越大,文件check_point同步写次数越少,变相提高整体性能。)
      • 后期可以参照readlines,利用read做一个类似的功能(以4k的方式读入,4k刚好能够吻合ssd的一次读写)。
    2. 调整Queue的大小等细节,在保证性能的前提下,减少内存使用,以防止OOM。

    优化后:

    1 1亿条,880Mb短日志。大概需要14秒(受益于文件读取优化)。
    2 1千万条,3.7G日志。大概需要55秒。

    2018/11/29更新

    考虑到单核存在进程调度的消耗。

    在腾讯云再次测试:
    条件:
    Agent amd 2c4g 50g ssd
    Server amd 2c4g 50g ssd
    通过dd测出磁盘性能连续读写性能大概在130MB/s之间。

    结果:

    1. 1亿条,880Mb短日志。大概需要15秒(受益于文件读取优化)。
    2. 1千万条,3.7G日志。大概需要45秒。

    2018/12/3更新

    大幅度减少Cpu占用,磁盘性能成为系统瓶颈,引入LZ4作为提高性能选择。(ps:LZ4算法需要依赖相应的python packet。)

    彻底解决网络bug问题,由于Socket中 recv(len)不保证接收完整的len长度的data, 会出现数据异常问题。(通过判断每一次recv大小,直至达到len大小作为一次传输完整的数据。查了4天bug~~~)

    由于毕业问题,暂停埋坑2周,任何问题请联系我18689235591@163.com,我叫John即可。

    相关文章

      网友评论

          本文标题:日志采集Agent

          本文链接:https://www.haomeiwen.com/subject/qrmrqqtx.html