日志收集之rsyslog to kafka

作者: modeyangg_cs | 来源:发表于2016-02-22 14:49 被阅读6159次

    项目需要将日志收集起来做存储分析,数据的流向为rsyslog(收集) -> kafka(消息队列) -> logstash(清理) -> es、hdfs; 今天我们先将如何利用rsyslog进行日志收集到kafka。

    一、环境准备

    通过对 rsyslog官方文档 查看,得知 rsyslog对 kafka的支持是 v8.7.0版本后才提供的支持.通过 ChangeLog 也可以看出 V8.X的版本变化.
    最新V8稳定版已经提供RPM包的Rsyslog-kafka插件了,直接yum安装即可,添加yum源:

    [rsyslog_v8]
    name=Adiscon CentOS-$releasever - local packages for $basearch
    baseurl=http://rpms.adiscon.com/v8-stable/epel-$releasever/$basearch
    enabled=1
    gpgcheck=0
    gpgkey=http://rpms.adiscon.com/RPM-GPG-KEY-Adiscon
    protect=1
    

    添加后 yum install rsyslog rsyslog-kafka.x86_64即可完成安装。

    二、配置

    1. 处理原则

    • input submit received messages to rulesets, zero or many
    • ruleset contains rule, rule consist of a filter and an action list
    • actions consist of the action call itself (e.g. ”:omusrmsg:”) as well as all action-defining configuration statements ($Action... directives)

    2. Statement Types 表达式类型

    通常利用RainerScript type statements进行非常简洁明了的配置声明,例如:

    mail.info /var/log/mail.log 
    

    3. 流程控制

    • Control structures
    • 过滤条件
      1. Selector: 传统方式,格式如下:
        <facility>[,facility...][,*].[=,!]<priority>[,priority...][,*];<facility>[,facility...][,*].[=|!]<priority>[,priority...][,*]...
        其中默认facility为auth, authpriv, cron, daemon, kern, lpr, mail, mark, news, security (same as auth), syslog, user, uucp and local0 through local7;
        默认priority为debug, info, notice, warning, warn (same as warning), err, error (same as err), crit, alert, emerg, panic (same as emerg);
        2) Property-based filters: new filter type. 形式如下:
        :property, [!]compare-operation, "value"
        分别对应 名字,比较符, 需要对比的字段。比较符包括 contains, isequal, startswith, regex, ereregex
        3) Expression based filters:
        if expr then action-part-of-selector-line
      2. BSD-style blocks:
      3. 例子: if $syslogfacility-text == 'local0' and $msg startswith 'DEVNAME' and not ($msg contains 'error1' or $msg contains 'error0') then /var/log/somelog

    4. 数据处理:支持set, unset, reset操作

    备注: Only message json (CEE/Lumberjack) properties can be modified by the set, unset andreset statements

    5. input

    有很多种input模块, 我们以imfile模块为例, 此模块将所有的文本文件内容逐行转到syslog中.

    input(type="imfile" tag="kafka" file="analyze.log" ruleset="imfile-kafka"[, Facility=local.7])
    

    6. outputs

    也叫作actions, 处理动作,格式如下

     action (
            type="omkafka"
            topic="kafka_test"
            broker="10.120.169.149:9092"
        )
    
    

    7. Rulesets and Rules

    Rulesets包括多条rule,一条规则就是rsyslog处理消息的一种方式, 每个规则包含filter和actions

    input(type="imfile" tag="kafka" file="analyze.log" ruleset="rulesetname")
    ruleset(name="rulesetname") {
        action(type="omfile" file="/path/to/file")
        action(type="..." ...)
        /* and so on... */
    }
    

    通过input里面的ruleset配置,将输入流进入ruleset进行规则匹配,然后执行action操作,完成对流的处理。

    8. Queue parameters

    将不同的输入流进入不同的队列并行处理数据,通常在ruleset或者action中配置,默认只有一个队列。配置参数例子

    action(type="omfwd" target="192.168.2.11" port="10514" protocol="tcp"
           queue.filename="forwarding" queue.size="1000000" queue.type="LinkedList"
          )
    
    

    9. templates

    这是rsyslog一个重要的特性,它可以让用户自定义输入流格式,同样也可以用于动态生成日志文件, 默认是原始格式。
    一般表达式如下:
    template(parameters) { list-descriptions }

    • list : 列表模板,包含name, type="list", 多个constant和property对。
    template(name="tpl1" type="list") {
        constant(value="Syslog MSG is: '")
        property(name="msg")
        constant(value="', ")
        property(name="timereported" dateFormat="rfc3339" caseConversion="lower")
        constant(value="\n")
        }
    
    • string: 字符串自定义格式模块, 由name, type="string", string="<onstant text and replacement variables>", 例如

    %TIMESTAMP:::date-rfc3339% %HOSTNAME%%syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n"

    将每个日志字段通过自定义变量和处理方式(property replacer)得到全局能读取的日志变量。

    注意:

    1. 原始格式: v6之前的格式,$template strtpl,"PRI: %pri%, MSG: %msg%\n"
    2. 利用action里的template参数将templates和action进行绑定,如
      action(template=TEMPLATENAME,type="omfile" file="/var/log/all-msgs.log")

    三. 实例

    增加一个将nginx access日志通过rsyslog传输到kafka的实例,将nginx_kafka.conf放入到/etc/rsyslog.d目录中,重启rsyslog即可。

    # 加载omkafka和imfile模块
    module(load="omkafka")
    module(load="imfile")
    
    # nginx template
    template(name="nginxAccessTemplate" type="string" string="%hostname%<-+>%syslogtag%<-+>%msg%\n")
    
    # ruleset
    ruleset(name="nginx-kafka") {
        #日志转发kafka
        action (
            type="omkafka"
            template="nginxAccessTemplate"
            confParam=["compression.codec=snappy", "queue.buffering.max.messages=400000"]
            partitions.number="4"
            topic="test_nginx"
            broker="10.120.169.149:9092"
            queue.spoolDirectory="/tmp"
            queue.filename="test_nginx_kafka"
            queue.size="360000"
            queue.maxdiskspace="2G"
            queue.highwatermark="216000"
            queue.discardmark="350000"
            queue.type="LinkedList" 
            queue.dequeuebatchsize="4096"
            queue.timeoutenqueue="0"
            queue.maxfilesize="10M" 
            queue.saveonshutdown="on"
            queue.workerThreads="4"
        )
    }
    
    # 定义消息来源及设置相关的action
    input(type="imfile" Tag="nginx,aws" File="/var/log/access.log" Ruleset="nginx-kafka")
    

    检查conf文件是否正确可以运行rsyslogd debug模式rsyslogd -dn运行,看日志输出结果,或者直接运行rsyslogd -N 1检查conf文件是否正确。

    相关文章

      网友评论

        本文标题:日志收集之rsyslog to kafka

        本文链接:https://www.haomeiwen.com/subject/tyhbkttx.html