美文网首页ElasticSearch
CentOS7 Logstash6.2 简单记录

CentOS7 Logstash6.2 简单记录

作者: 坚持到底v2 | 来源:发表于2017-11-01 10:24 被阅读1000次

零、Elastic Stack产品:

https://www.elastic.co/guide/index.html

1、Beats

Beats 是开源的data shipper(数据搬运者),以agent的形式运行,发送各种类型的可操作的数据给elasticsearch。beats可以直接发送数据给elasticsearch或通过先发送给Logstash。Packetbeat、Filebeat、Metricbeat和Winlogbeat是Beats的一些例子,你也可以可以创建你自己的beat。

2、Elasticsearch

Elasticsearch 负责存储、检索和分析。
Elasticsearch是一个易扩展的开源的全文本搜索和分析引擎。
它允许你快速地(接近实时地)存储、搜索和分析大量的数据。
场景应用例子:
(1)你运行一个在线的web存储并允许你的客户搜索你卖的商品,你可以使用Elasticsearch存储你所有的商品catalog和存货清单并提供搜索和自动完成建议;
(2)你想收集日志或事务数据并且你想分析这些数据来预测、统计等。
(3)你运行一个价格提醒平台,你可以将提醒条件放入elasticsearch并使用它的反向搜索能力并推送提醒给客户。

3、Elasticsearch Hadoop

Elasticsearch Hadoop 包含3个类似的、但互相独立的3个子项目,专注于不同的方面。
elasticsearch-hadoop proper是在Hadoop环境中与elasticsearch交互,如果你使用Map/Reduce、Hive、Pig、ApacheSpark、Apache Storm、Cascading,这个项目适合你。
repository-hdfs是使用HDFS作为后端的repository来snapshot/restore from/to elasticsearch。
elasticsearch on YARN是在YARN上运行elasticsearch。

4、Logstash

Logstash 负责集中、转换 & 存储你的Data。

5、Kibana

Kibana 与elasticsearch一起工作,用于用于分析并形象展示数据。你使用它搜索、查看并与数据交互。

6、X-Pack

X-Pack 是elastic stack的扩展,集合了安全、提醒、监控、报告、图形化等能力到同一个包中。


Logstash Docs

https://www.elastic.co/guide/en/logstash/current/index.html


一、logstash介绍

1. Logstash负责集中、转换 & 存储你的Data。

Logstash是一个开源的、服务端的数据处理pipepline,
它接收多个源的数据、对它们进行转换,
然后将它们发送到你喜欢的"stash"(就是Elasticsearch)。

  • 接收数据用input
  • 解析并转换数据用filter
  • 输出数据用output

Logstash使用的是插件机制,各种插件在 https://github.com/logstash-plugins
当然也可以自己写插件

用X-Pack监控 Logstash (https://www.elastic.co/products/x-pack/monitoring)

2. 安装

注意 logstash需要 Java8 。并且不支持Java 9.

检查java version

java -version

2.1 使用二进制包安装

访问 https://www.elastic.co/downloads/logstash
找到你需要的包,解压。

# 下载tar包
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.2.tar.gz
# 解压
tar xf logstash-6.2.2.tar.gz 

# 进入到目录
cd logstash-6.2.2/

# 准备一个config file
cat > logstash.conf << EOF
input {
  beats {
    port => "5044"
  }
}

output {
  stdout { 
    codec => rubydebug 
  }
}
EOF

# 运行
bin/logstash -f logstash.conf

# 使用 -e  在命令行上指定配置内容
bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
# 上面的命令执行可能需要等待一会

# 使用 -f或--path.config  指定配置文件 
# 使用 --config.test_and_exit  解析配置文件并报告可能的错误
bin/logstash -f some.conf --config.test_and_exit 

# 如果配置文件通过了检查,执行下面的命令
# 使用 --config.reload.automatic 表示修改了配置文件之后自动重新加载配置,这样你就不用重启Logstash
bin/logstash -f some.conf --config.reload.automatic

2.2 yum方式安装

# 下载并安装公共签名key
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

# 生成 yum 源文件
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

# 安装
sudo yum install logstash
pipeline的骨架:
# 使用#进行注释 
input {
}

filter {
  #filter是可选的
}

output {
}

3. 使用 logstash 解析日志

以Filebeat为例,
首先需要运行一个Filebeat服务端,
然后使用Logstash内置的Beats input来让Logstash可以接收Elastic Beats框架发出的事件。

3.1 配置Filebeat

https://www.jianshu.com/p/b7245ce58c6a
配置Filebeat的配置文件 让其output到Logstash的IP:port(5043端口,为什么是这个端口,看后面),
然后运行filebeat,
Filebeat会尝试连接到Logstash服务器端口。

3.2 接下来配置logstash

创建一个pipeline Configuration,这个pipeline使用beats input plugin来接收Beats的事件。
新建一个文件 first-pipeline.conf

input {
  beats {
    port => "5043"
  }
}

output {
  stdout { 
    codec => rubydebug 
  }
}

然后运行

bin/logstash -f first-pipeline.conf --config.reload.automatic

你会在console看到一些比较原始的信息

3.3 简单配置filter

grok filter是Logstash内置的几个插件之一。
grok filter插件使你能将非结构化的数据解析成结构化的、可查询的数据。
grok 使用正则表达式,例如 web server 的log 可以用 %{COMBINEDAPACHELOG} 来解析

将下面的配置放到配置文件中,然后观看输出,
可以看到除了原始的信息之外,还有解析出的各个字段的值

filter {
  grok {
     match => { 
        "message" => "%{COMBINEDAPACHELOG}"
     }
  }
}

除了将数据解析成可查询的数据之外,filter插件还可以基于提供的数据进行数据扩展。
例如geoip插件能根据IP地址,查找位置等信息

filter {
  grok {
     match => { 
        "message" => "%{COMBINEDAPACHELOG}"
     }
  }
  geoip {
    source => "clientip"
  }
}

现在web logs 已经分解成特定的字段(fields),Logstash pipeline可以将数据index到Elasticsearch集群中了。
将output更改为如下配置,Logstash将通过http协议连接Elasticsearch。

output {
  elasticsearch {
     hosts => [ "localhost:9200","IP2:port2",... ]
  }
}

还可以在input和output中使用多个input和output插件,例如

output {
    elasticsearch {
        hosts => ["IP1:port1", "IP2:port2", "IP3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

4. Logstash是如何工作的?

Logstash处理event的pipeline有3个阶段:
inputs=>filters=>outputs

  • inputs 生成events,
  • filters 修改它们,
  • outputs 将他们发送到其他位置。

inputs和outputs支持codecs,让数据在进入或离开pipeline时进行decode或encode,而不用使用额外的filter。

  • input codecs在数据进入到input之前对其进行decode操作,
  • output codecs在数据离开output之前对其进行encode操作。

默认的plain codec是每一行算作一条message,如果是其他的codec,例如multiline或json,就不是这样了,关于codec的理解一定要清晰

4.1 一些常用的inputs

  • file: 读取一个文件,类似于UNIX的tail -fn0 someFile

  • syslog: 监听514端口的syslog messages,并使用RFC3164格式进行解析

  • redis: 从redis服务器读取,使用redis channels和redis lists。
    Redis经常作为一个中间人,先将数据发送到Redis,然后logstash再从Redis读取。

  • beats: 处理Filebeat发出的事件,Filebeat用于将文件内容发送到远端logstash。

4.2 一些常用的filters

你可以设置条件来触发filters

  • grok: 解析并结构化任意数据。有很多内置的patterns供你使用。

  • mutate: 在event fields上执行通用的转换。
    你可以rename、remove、replace、modify event fields。

  • drop: 丢弃events,例如debug events。

  • clone: 制作eventde一份拷贝,可能增加或移除一些fields。

  • geoip: 为IP地址增加一些关于地理位置的信息(可以在Kibana中展示amazing charts)

4.3 一些常用的outputs

一个event可以穿过多个outputs,直到所有的output处理完毕,event也就结束了。

  • elasticsearch: 发送数据到elasticsearch。

  • file: 发送数据到文件。

  • graphite: 发送数据到graphite,这是一个开源的工具用于存储和graphing metrics。

  • statsd: 发送数据到statsd,这是一个服务,它监听统计数据,例如counters和timers,通过UDP发送,并发送聚合数据到一个或多个可插拔的后端服务。

4.4 一些常用的Codecs

codecs是基础的流filters,它可以作为input和output的一部分。

  • json、plain、msgpack

  • multiline:合并多行文本event,例如合并java Exception和stacktrace信息多行文本到一个单独的event。

4.5 Execution Model(执行模型)

每个input都在其自己的线程中运行,
inputs将event写入到一个通用的Java SynchronousQueue(Java同步队列)。
这个队列不持有events,而是将他们发送到一个空闲的worker(如果没有worker是空闲的就等待阻塞)。
每个pipeline worker thread从这个队列中取一批events,创建一个buffer,然后运行配置好的filters,然后运行配置好的outputs。

每批次获取的event数量(即size of batch)是可以配置的,worker的数量(即worker threads的数量)也是可以配置的。

默认情况下,Logstash在pipeline场景之间(即input->filter和filter->output)使用的是内存有界队列来缓存events。

如果Logstash不安全的退出,内存中存储的events会丢失。

你可以让Logstash将in-flight events持久化到disk。(略)

5. 设置并运行Logstash

5.1 文件夹结构

5.1.1 tar文件解压后的文件夹结构

  • bin/ 包含了 logstash启动脚本和logstash-plugin安装插件的脚本等可执行文件

  • config/ 包含了logstash.yml和jvm.options文件等配置文件(命令行参数--path.settings)

  • logs/ 包含了日志文件。(命令行参数--path.logs)

  • plugins/ 包含了本地的,非Ruby-Gem的插件文件,每个plugin都对应一个子文件夹。推荐只用于开发(实际没看到这个文件夹) 。(命令行参数 --path.plugins)

5.1.2 RPM安装包安装后的文件夹结构

  • /usr/share/logstash/ 默认安装路径,以下使用${Logstash_home}代替

  • ${Logstash_home}/bin 包含了 logstash启动脚本和logstash-plugin安装插件的脚本等可执行文件

  • /etc/logstash 包含了logstash.yml和jvm.options文件等配置文件(命令行参数--path.settings),这个文件夹还包含了startup.options。
    logstash.yml包含了启动Logstash需要的命令行参数。
    jvm.options的每一行都指令了JVM配置flags。
    startup.options文件包含了bin/system-install脚本需要的选项,system-install脚本用于生成与你系统相对应的启动脚本。当你安装Logstash包时,system-install脚本在安装的最后执行(使用startup.options文件)来设置logstash.yml中的参数,例如user、group、service name等信息。在你更改startup.options文件后需要再次运行system-install脚本才能生效。

  • /etc/logstash/conf.d 包含了pipeline的配置文件,可以指定文件夹、文件和通配符,不要在该文件夹中存放除了pipeline配置文件之外的文件! (命令行参数 --path.config)

  • /var/log/logstash #包含了日志文件。(命令行参数--path.logs)

  • ${Logstash_home}/plugins/ #包含了本地的,非Ruby-Gem的插件文件,每个plugin都对应一个子文件夹。推荐只用于开发(实际没看到这个文件夹) 。(命令行参数 --path.plugins)

5.2 logstash.yml文件

使用YAML格式编写。
如下所示:

pipeline:
  batch:
    size:125
    delay:5

也可以写成

pipeline.batch.size:125
pipeline.batch.delay:5

包含如下设置:

  • node.name 默认是机器名

  • pipeline.workers 如果你发现event正在堆积,或者CPU够用,建议增大这个值,默认是CPU的核数。

  • pipeline.output.workers 表示每个output插件实例使用的workers的数量,默认是1.

  • pipeline.batch.size 更大的值一般更有效率,但是考虑到内存,你或许需要增加JVM heap size,通过设置LS_HEAP_SIZE变量。默认是125。

  • pipeline.batch.delay 在分发一个batch到filters和workers之前最长等待的时间,默认是5(ms)

  • pipeline.unsafe_shutdown 当设置为true时,将强制Logstash退出,即使在shutdown期间仍然有inflight event。默认为false,表示Logstash会拒绝退出直到所有接收的event被push到了outputs中。

  • path.config pipeline配置文件或文件夹,如果使用了通配符或指定了文件夹,配置文件按alphabetical(字母)顺序读取。

  • config.test_and_exit 当设置为true时,检查配置文件是否有效并退出。注意grok patterns不会被检查。默认为false。

  • config.reload.automatic 当设置为true时,会周期性检查配置是否发生了更改并reload。也可以手动发送SIGHUP信号来reload。默认为false

  • config.reload.interval 默认为3(s)

  • config.debug 当设置为true时,会显示fully compiled Configuration as a debug log message。你必须同时设置log.level:debug。 警告!log信息会包含传递给插件的密码。

  • log.level 有fatal、error、warn、info、debug、trace。默认值是info。

  • log.format 日志格式。可选值有json、plain(使用的是Object#.inspect)。默认值是plain。

  • queue.type 可选值有memory和persisted,默认是memory。关于persisted的相关参数略

  • http.host metrics REST endpoint的绑定的地址。默认是127.0.0.1

  • http.port metrics REST endpoint的绑定的端口。默认是9600

  • path.logs Logstash日志存放的文件夹。默认是${Logstash_home}/logs

  • path.plugins 查找自定义plugins的文件夹。你可以多次使用来指定多个路径。期望的文件夹结构为: PATH/logstash/TYPE/NAME.rb 。TYPE是inputs/filters/outputs/codecs,NAME是插件的名称。

5.3 命令行参数

(同logstash.yml文件中的配置参数就略了)

  • --version # -V 显示版本

  • --help # -h

  • --node.name 默认机器名

  • --path.config CONFIG_PATH -f 指定文件或文件夹。
    如果多次指定,只取最后一个有效。
    可以使用通配符,例如 --path.config '/tmp/{one,two,three}'

  • --config.string CONFIG_STRING -e 在命令行上指定pipeline config字符串

  • --pipeline.workers COUNT -w 指定worker的数量

  • --pipeline.batch.size SIZE -b

  • --pipeline.batch.delay DELAY_IN_MS -u

  • --pipeline.unsafe_shutdown 等同于设置pipeline.unsafe_shutdown为true

  • --path.plugins PATH -p

  • --path.logs PATH -l

  • --log.level LEVEL

  • --config.debug

  • --interactive SHELL -i 丢带shell中而不是正常运行,可选的值有irb和pry

  • --config.test_and_exit -t 等同于设置config.test_and_exit为true

  • --config.reload.automatic -r

  • --config.reload.interval RELOAD_INTERVAL

  • --http.host HTTP_HOST

  • --http.port HTTP_PORT

  • --log.format FORMAT

  • --path.settings SETTINGS_DIR 指定包含logstash.yml、log4j2.properties等配置文件的文件夹。
    也可以通过设置环境变量LS_SETTINGS_DIR来指定。

5.4 Logging(Logstash自己的日志)

Logstash会记录内部的日志到LS_HOME/logs或/var/log/logstash目录下。
默认的log级别是INFO。
Logstash使用log4j2框架。

当调试时,将loglevel降到debug会收集到更详细的信息。
以前你只能对整个Logstash设置log级别,从5.0以后你可以为特定的子系统单独进行设置。
例如你正在调试output的问题,你可以只对output组件的loglevel进行设置。

Logstash使用一个log4j2.properties(该文件位于config文件夹中)来进行日志设置。
你可以直接修改这个文件,然后必须重启Logstash生效。

slowlog:

可以在logs文件夹中找到。
slowlog可以在logstash.yml中进行配置:

slowlog.threshold.warn (default: -1)
slowlog.threshold.info (default: -1)
slowlog.threshold.debug (default: -1)
slowlog.threshold.trace (default: -1)

默认情况下,这些值被设置为-1纳秒,表示不限制threshold,即没有slowlog。
例子

slowlog.threshold.warn 2s
slowlog.threshold.info 1s
slowlog.threshold.debug 500ms
slowlog.threshold.trace 100ms

在上面的例子中,在一个filter中处理时间超过2s的events会被记录。
日志会记录event的全部和对造成slowness负责的filter配置。

Logging APIs

虽然你可以直接修改log4j2.properties文件并重启Logstash,但是这很无聊并可能会导致不必要的downtime。
你还可以 通过logging api 动态更新logging level.

例子1:设置Logstash的outputs子系统的elasticsearch模块的logger log级别。

PUT /_node/logging
输出:
{ "logger.logstash.outputs.elasticsearch" : "DEBUG" }

例子2:获取各子系统的运行时 logging list

GET /_node/logging?pretty
#可以看到一堆的loggers及其loglevel。

5.5 关闭Logstash:

systemctl stop logstash
kill -SIGTERM {logstash_pid} # 或 kill -15 或 kill

在Logstash安全关闭前,它会:

  • 停止input、filter和output插件
  • 处理所有的in-flight events
  • 结束Logstash进程

下列的情形会影响shutdown的过程:

  • 一个input plugin正在缓慢的接收数据
  • 一个缓慢的filter,例如执行sleep(10000)的ruby filter或一个elasticsearch filter正在执行非常缓慢的查询。
  • 一个断开的output插件正等待重新连接以便flush inflight events。

这些情况会使得shutdown过程的时间和成功变得不可预知。
Logstash有一个stall(陷入泥潭)检测机制,分析在shutdown期间pipeline和插件的行为。这个机制产生周期性的信息,包含了在内部队列中inflight event的数量和繁忙的worker线程列表。

可以使用--pipeline.unsafe_shutdown来强制关闭。

6. 配置Logstash

6.1 配置文件示例:

注释使用#开头,并且不需要必须在行的开头

input {
  file {
    path => "/var/log/messages"
    type => "syslog"
  }

  file {
    path => "/var/log/apache/access.log"
    type => "apache"
  }
}

插件可以要求配置项的值必须是特定类型,支持array、lists、boolean、bytes、codec、hash、number、password、uri、path、string

  • Array 示例:(现在已经基本弃用。只是仍然用来处理hashes或mixed types的lists,这些都不用核查。) users => [ {id => 1,name => bob},{id => 2,name => jane} ]

  • Lists 示例: path => [ "/var/log/messages", "/var/log/*.log" ]

  • Boolean示例:ssl_enable => true #注意true 或 false 没有引号

  • Bytes示例:

my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB"  # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
  • Codec示例:codec => "json"

  • Hash示例:

match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}
  • Number示例:port => 33

  • Password示例:不会被logger或打印出来 my_password => "password"

  • URI示例:my_uri => "http://foo:bar@example.net"

  • Path示例:my_path => "/tmp/logstash"

  • String示例:name => 'It's a beautiful day'

6.2 在pipeline配置文件中访问Event Data和Fields:

events都拥有属性,例如一个apache access log中应该会有状态码(200,404)、请求路径(/或index.html)、HTTP方法(POST/GET)、客户端IP地址等,我们称这些属性为Fields。

一些配置选项需要这些Fields存在才能工作。因为inputs用于生成events,所以这时候还没有任何fields!

使用 ${[FieldReference]} 来引用字段内容,例如 ${type} === ${[type]}

Field references(引用)
语法:[fieldname]

如果你引用的是最顶级的field,你可以省略[],只是简单的使用fieldname。
如果你要引用一个nested field,你可以指定全路径: [top-level field][nested field]。

例如 对如下数据

{
  "agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
  "ip": "192.168.24.44",
  "request": "/index.html"
  "response": {
    "status": 200,
    "bytes": 52353
  },
  "ua": {
    "os": "Windows 7"
  }
}

上面的数据有5个top-level fields,有3个nested fields。
要引用 os field,你可以使用[ua][os]。
例子:

output {
  statsd {
     increment => "apache.%{[response][status]}"
  }
}
Conditionals(条件执行)

只对特定条件的event进行 filter和output
Conditionals 支持if、else if和else的嵌套。
其语法是

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

EXPRESSION(表达式)是什么?是比较测试、布尔值逻辑等等

  • 你可以使用 == != < <= > >=

  • 你可以使用正则 =~ !~ (例如 someStr =~ pattern)

  • 你可以使用 in 、 not in

  • 支持 and or nand xor !

  • 使用 () 分组

表达式可以很长并且很复杂。
例子:

filter {
  if [action] == "login" {
    mutate { remove_field => "secret" }
  }
}
特殊的fields:
1、@metadata field

@metadata field 在logstash 1.5之后引入,主要用于conditionals。
@metadata的内容不是events的内容。

rubydebug codec 允许你暴露@metadata的内容,如下所示:

stdout { 
  codec => rubydebug { 
      metadata => true 
   } 
}

6.3 在pipeline配置文件中使用环境变量

  • 使用 ${var} 引用环境变量

  • 在Logstash启动的时候,每个引用会被替换成值,之后就不会再动态变化。

  • 区分大小写

  • 引用的环境变量不存在会导致Configuration错误。

  • 你可以使用 ${var:defaultValue}来指定一个默认值

6.4 logstash配置例子

处理apache2 access logs的例子
input {
  file {
    path => "/tmp/access_log"
    start_position => "beginning"
  }
}

filter {
  if [path] =~ "access" {
    mutate { 
        replace => { 
           "type" => "apache_access" 
        } 
    }
    grok {
      match => { 
            "message" => "%{COMBINEDAPACHELOG}" 
      }
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
  stdout { 
    codec => rubydebug 
  }
}
处理 syslog messages的例子
input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { 
    hosts => ["localhost:9200"] 
  }
  stdout { 
    codec => rubydebug 
  }
}

6.5 重新加载配置文件:

重新加载配置文件的方式:
  • --config.reload.automatic 需要配置文件的内容允许重新加载,默认每3秒检查一次配置文件的变化。
  • kill -1 14175 ; 或 kill -HUP 14175

在配置文件reload期间,JVM不会重启。

对grok pattern files的更改也会被reloaded,但是不会触发reload。

6.6 处理多行的events

要处理多行的events,Logstash需要知道哪些行是一个单独events的组成部分。
优选的工具是multiline codec,它可以将一个单独的input的多个行合并到一个单独的行。
multiline codec最重要的配置部分是:

  • pattern :指定一个正则表达式。那些匹配到的lines被认为是作为之前行的继续或是新的multiline event的开始。你可以使用grok正则表达式模版。
  • what : 有2个可选值 previous或next。previous表示匹配到的行是前面的行的一部分。next表示那些匹配到的行是后面行的一部分。
  • negate: true或false(默认值是false)。表示反向匹配。
例子1 处理Java Stack Traces

java的栈追踪信息除了第一行之外其他都以空格开头,以下就表示所有以空格开头的line都是之前行的一部分

input {
  stdin {
    codec => multiline {
      pattern => "^\s"
      what => "previous"
    }
  }
}
例子2 有些多行的日志在未结束时会在行尾加\

以下就表示所有以\结尾的line都是之后行的一部分

filter {
  multiline {
    type => "somefiletype"
    pattern => "\\$"
    what => "next"
  }
}
例子3 服务的活动日志一般都以一个时间戳开始,然后是其他信息

以下就表示所有不是(negate=true)以时间戳开头的行都是前面行的一部分

input {
  file {
    path => "/var/log/someapp.log"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601} "
      negate => true
      what => previous
    }
  }
}

6.7 glob pattern支持:

  • * 匹配任意文件或任意字符。但是不匹配隐藏文件(.开头的文件),要匹配.开头的文件,使用 {*,.*}
  • ** 递归匹配任意的文件夹
  • ? 匹配任意单个字符
  • [set] 匹配区间内的任意单个字符,例如 [a-z] [^a-z]反向匹配
  • {p,q} 匹配 或者是p或者是q。可以不是单个字符,可以大于2个选项,等同于正则表达式中的(foo|bar)
  • \ 转义用

7. 部署并scaling Logstash:

7.1 最小化安装: 一个Logstash实例和一个elasticsearch实例。

7.2 将Logstash分为shipping instance 和 indexing instance

  • shipping instance 用于将input信息输出到message queue中,可以有多个这样的shipping instance
  • indexing instance 用于将messages queue中的信息index到elasticsearch集群中

8. 性能调优:

一次改一个配置,然后观察结果。不要一上来就更改pipeline.workers的数量。

性能检查列表(按顺序):

8.1 检查input source和output 目标的性能。

如果input源和output目标都不够快,那Logstash肯定也快不了。

8.2 检查系统信息:

  • CPU: 是否负荷很重。可以使用top -H查看。如果CPU使用率很高,那就调到第(3)步(检查JVM heap),然后阅读第(4)步(调优worker设置)。
  • memory:记住Logstash运行于一个Java VM内。这意味着Logstash总是使用分配给它的最大的内存。内存不足时可能会导致Logstash使用swap分区。
  • I/O使用情况:监控磁盘I/O以便检查disk的负荷。如果你使用了file output,有可能会填满你的磁盘 ;如果Logstash产生了很多的错误日志,也可能会导致磁盘占满; 你可以使用iostat、dstat或其他类似的工具来监控磁盘I/O。
  • 网络I/O:如果你的input和output使用了大量的网络操作。你可以使用dstat或iftop等工具来监控网络。

8.3 检查JVM heap:

很多时候CPU的性能降低都是因为 heap size太小,导致JVM频繁的进行垃圾回收。
一个简单的方法是 将heap size变成原来的双倍大小,然后观察性能改善。
不要将heap size设置超过实际的物理内存大小。
至少留1GB给OS和其他进程。
你可以使用jmap命令行工具或使用VisualVM来精确地度量JVM heap。

8.4 调优worker设置:

  • 使用 --pipeline.workers COUNT选项来增加workers的数量。将其扩展到CPU核数的整数倍是安全的。
  • 每个output默认只能使用一个单独的pipeline worker。你可以增加这个值,通过修改配置文件中的output的worker设置。
  • 你还可以调优output的batch size。对于很多的outputs,例如elasticsearch output,这个设置和I/O操作的size一致。例如elasticsearch output,这个设置和batch size一致。

8.5 调优并Profiling Logstash performance:

Logstash提供了如下的pipeline性能调优选项:

  • pipeline.workers
  • pipeline.batch.size
  • pipeline.batch.delay

9. 监控APIs

通用选项:
  • (a) ?pretty=true 表示返回的JSON信息会被pretty格式化(影响性能,只在debug时使用!)
  • (b) human=true 表示返回plain文本,而不是JSON格式。默认是false。
节点信息API
  • (a) GET /_node/<types> #types是可选的,默认是显示pipeline、os、jvm信息,写上type就只显示type的信息。
  • (b) GET /_node/plugins #获取所有的当前已安装的Logstash插件。这个API返回的是 bin/logstash-plugin list --verbose显示的内容。
  • (c) GET /_node/stats/<types> #检索Logstash运行时stats。types是可选的,用于只显示types类型的信息,可选的值有jvm、process、pipeline、reloads、os。
  • (d) GET /_node/hot_threads #输出JSON格式的文本,包含the top hot threads(负载最高的线程)的breakdown(分解)。可选的参数有
    • threads=3 #设置显示的hot threads的数量。
    • human=false #使用plain格式而不是JSON格式。
    • ignore_idle_threads=true #不显示idle的threads。

10. plugins插件

bin/logstash-plugin 脚本用于管理插件的生命周期。
你可以使用这个脚本安装、移除、升级插件。

10.1 列出插件:列出当前可用的插件

logstash-plugin list  #列出所有已安装的插件
logstash-plugin list --verbose  #显示插件的版本信息
logstash-plugin list '*namefragment*' #搜索创建
logstash-plugin list --group output #列出output(input、filter、codec)插件

10.2 安装插件:

export HTTP_PROXY=http://127.0.0.1:3128 #设置代理
logstash-plugin install logstash-output-kafka #首先你需要联网,能够访问RubyGems.org来获取插件,一旦安装完毕,你就可以在配置文件中使用它们。
logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem #本地安装

关于--path.plugins指定插件文件夹,前面已经说了,这个文件夹的目录架构应该是PATH/logstash/TYPE/NAME.rb,TYPE是input、output、filter、codec,NAME就是插件名称。

10.3 更新插件:

logstash-plugin update [logstash-output-kafka] #更新所有插件或指定插件

10.4 删除插件:

logstash-plugin remove logstash-output-kafka

10.5 离线插件包制作:

logstash-plugin prepare-offline-pack --output OUTPUT [PLUGINS]
# OUTPUT指定压缩的插件包要被写往何处,默认是/LOGSTASH_HOME/logstash-offline-plugins-5.2.2.zip
# [PLUGINS] 指定要打进插件包中的插件(一个或多个,可以使用通配符)

例子

bin/logstash-plugin prepare-offline-pack logstash-input-beats 
bin/logstash-plugin prepare-offline-pack logstash-filter-* 
bin/logstash-plugin prepare-offline-pack logstash-filter-* logstash-input-beats

10.6 离线插件包使用:

logstash-plugin install file:///path/to/logstash-offline-plugins-5.2.2.zip

10.7 Event API:

供插件开发者使用和Ruby filter使用,Ruby filter例子

filter {
  ruby {
    code => 'event.set("lowercase_field", event.get("message").downcase)'
  }
}

11. input插件

https://www.elastic.co/guide/en/logstash/current/input-plugins.html
elastic支持的插件查看 https://www.elastic.co/support/matrix#show_logstash_plugins
这里只列出常用的

beats

接收elastic beats框架发出的events

elasticsearch:

读取elasticsearch集群的查询结果,这对重现test logs、reindexing有帮助。
例子:

input {
  # Read all documents from Elasticsearch matching the given query
  elasticsearch {
    hosts => "localhost"
    query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
  }
}

这会创建一个如下格式的elasticsearch query:

curl 'http://localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{
  "query": {
    "match": {
      "statuscode": 200
    }
  },
  "sort": [ "_doc" ]
}'
eventlog

拉取windows的event log。

exec

捕获shell命令的输出作为event。周期性的运行一个shell命令并捕获所有的输出作为event。

file

从文件流获取events,一般来说是根据tail -fn0来读取文件发生的变化,但是也可以从头开始读文件。一般是一行作为一个event,如果你想将多行合并到一个event中,你可能需要使用multiline codec 或 filter。这个插件目标是最终文件的变化,而不是从头到尾地读取文件然后将其整个内容合并到一个单独的event中。 插件会记录当前读取到的位置(通过sincedb文件),这就让即使logstash重启也不会丢失lines。默认情况下,sincedb文件位于运行logstash的用户根目录下,其文件名中有被读取的文件的名称,如果这个文件被重命名了,那么sincedb也就失去作用了。 处理File rotation:可以检测到,不管file是通过重命名还是copy操作被rotated了。

generator

生成随机的log event,用于测试

heartbeat

生成heartbeat event,用于测试

http

通过HTTP或HTTPS接收event

http_poller

解码一个HTTP API的输出到events

log4j

通过TCP socket接收来自log4j SocketAppender 的event。只对log4j 1.x有效。

pipe

接收来自一个长时间运行的command pipe的流events

redis

从Redis实例读取events

sqlite

基于SQLite数据库的rows创建events

stdin

从标准输入读取events

syslog

读取syslog message作为event。syslog是一个非常混乱的术语。这个input仅支持RFC3164加一些小的修改。日期格式必须是RFC3164格式或ISO8601。其它部分也必须遵守RFC3164。如果你不使用RFC3164,不要使用这个input。

tcp

从TCP socket读取event。和stdin和file inputs插件类似,每一个event被假定为一行文本。其可配置项有

add_field=>{}
codec=>"plain"
host=>"0.0.0.0"
id="someStr" #为插件的configuration增加一个唯一ID,如果没有提供ID,logstash会生成一个。强烈推荐你设置一个ID。
mode=>"server" #还可以设置为client,表示连接到一个server
port=>"some number"
proxy_protocol=>false #目前只支持v1
tags=>[ {id => 1,name => bob},{id => 2,name => jane} ] #设置任意数量的任意tags
type=>"someStr" #type一般用于启动filter。
udp

从UDP 读取events

unix

从UNIX socket读取event

xmpp

通过XMPP/Jabber协议接收events

log4j2

(实测安装有问题) 项目主页 https://github.com/jurmous/logstash-log4j2
安装 bin/logstash-plugin install logstash-input-log4j2 #如果不能安装,请先运行logstash
查看 bin/logstash-plugin list | grep log4j2
配置文件

input {
  log4j2 {
    port => 7000
    mode => "server"
  }
}

output {
  stdout { codec => rubydebug }
}

12. output插件

https://www.elastic.co/guide/en/logstash/current/output-plugins.html

elasticsearch

存储logs到Elasticsearch。

file

将event写到磁盘上。

http
pipe

将event的输出作为另一个程序的标准输入

redis

将events发送到一个Redis queue,使用RPUSH命令。

stdout
tcp
udp

13. filter插件

https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

aggregate

将属于一个task的多个events的信息聚合在一起,并最终聚合到最后一个events中。一定要将worker数量设置成1,否则处理顺序错乱会导致未知的结果。

alter

允许你做一些 在mutate filter中没有包含的对field的操作。

cidr

检查IP地址是否包含在network blocks中

clone

复制Event

collate

校对Event,通过time或count

csv

解析逗号分割的值到不同的fields

date

解析fields中的date作为Logstash的timestamp 。
例子:第一个值是field的name,然后是可能的时间格式

date {
  match => [ "logdate", "MMM dd yyyy HH:mm:ss","MMM  d yyyy HH:mm:ss", "ISO8601" ] 
}
dissect

使用delimiters(定界符)来将数据分割到Fields

dns

执行一个标准的或反的DNS lookup

drop

丢掉所有events

elapsed

计算两个events之间消耗的时间

elasticsearch

拷贝前一个event的fields到当前的events

environment

存储环境变量作为metadata sub-fields。

extractnumbers

从一个字符串中提取数字

geoip

增加IP地址的地理信息

grok

使用正则来解析非结构化的数据。
Logstash有大约120个默认的patterns。
语法:

%{NUMBER:duration} %{IP:client}

NUMBER、IP是pattern,duration、client是解析后的field名称
整体的例子:

grok {
  match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
i18n

移除fields中的特殊字符

json

解析json events。
例子: target不写则默认被存储到event的root(top level)中

json { 
  source => "message" 
  target="doc" 
}
json_encode

将一个field序列化成JSON

kv

解析key-value pairs

metaevent

增加任意的fields到event

mutate

对fields执行变化操作。
例子

# 将field1的value转换成integer类型
mutate { 
  convert => { "field1" => "integer" }
}

# 替换
mutate { 
  gsub => [ "field1","/","_", "field2","[\\?#-]","." ]
} 

# field1如果不是array类型就没有效果
mutate { 
  join => { "field1" => "," } 
} 

mutate { 
  lowercase => [ "field1" ]
} 

# 将两个field合并
mutate { 
  merge => { "dest_field" => "added_field" } 
} 
range

对指定的字段检查size或length limits

ruby

执行任意的ruby代码

sleep

睡眠特定的时间

split

将多行的信息split成单独的events

translate

将field的内容替换。

useragent

将user agent字符串解析到fields中

uuid

增加一个uuid到events

xml

将xml解析到fields。

14. codec插件

https://www.elastic.co/guide/en/logstash/current/codec-plugins.html

json

如果数据是一个JSON array,会创建多个events。
如果你要解析\n分割的JSON消息,请使用json_lines。
如果接收到的payload不是JSON,它会使用plain text并增加一个_jsonparsefailure的tag,然后payload被存储到message field中。

json_lines
line
multiline

将多行messages压缩合并成一个单独的事件,最初的目标是合并java exception and stacktrace和使用\分割的多行。

plain
rubydebug

将你输出的event使用Ruby Awesome Print库格式化。

相关文章

网友评论

    本文标题:CentOS7 Logstash6.2 简单记录

    本文链接:https://www.haomeiwen.com/subject/gyrxpxtx.html