美文网首页
Logstash 处理业务日志

Logstash 处理业务日志

作者: laod_wh | 来源:发表于2020-01-16 17:04 被阅读0次

业务场景

最近在做CA卡相关业务处理,用户量700w,对于相关操作日志,按照传统的方式显然不合理的,出现问题排查也是个事,最后我考虑通过Logstash直接全写到es内,下面我就简单写一下我怎么弄的,当然还有很多别的可以实现:例如 Filebeat等。

安装部署

  • 安装Logstash,之前通过docker安装过,这里我使用安装包的方式。
  1. 访问链接,下载文件
https://www.elastic.co/cn/downloads/past-releases
  1. 选择对应的产品及版本


    image.png
  2. 解压
    image.png
    安装完成 ~~~

定制化配置

1.创建文件

/work/elasticsearch/logstash-7.1.0/config/logstash-omc.conf

2.日志打印例子

2020-01-14 17:13:18,419 DEBUG http-bio-8082-exec-1 - {"createAt":"2020-01-14 17:13:03","status":"0","developer":null,"servid":"760000","action":"UPDATE","devno":"keyno01","devtype":1,"modifyAt":"2020-01-14 17:13:03","accountid":"131022","servstatus":null,"uapuserid":"152022","serialno":"76a58c28cff741aba621a6098cebaf6c"}

3.logstash-omc.conf 编写配置文件例子

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  file{
      #配置相关业务日志全部存入此文件
      path=>["/work/docker/bandDev.log"]
      type=>"apache-log"
      start_position => "beginning"
      ignore_older => 0
  }
}

filter {
    grok {
        match => {
            #正则表达式 根据日志打印格式编写
            "message" => ["([\s\S]*) (?<Msg>({.*?}))"]
        }
        #覆盖之前的message
        overwrite => ["message"]
        #去除无用的字段
        remove_field => ["host","beat","offset"]
    }

    json{
        source => "Msg"
        target => "xa"
    }
}

output {
  elasticsearch {
    #es的连接地址
    hosts => ["localhost:9200"]
    #es中的索引名称
    index => "xa-logstash"
    #user => "elastic"
    #password => "changeme"
  }

启动Logstash

#进入bin目录下
e24-6:bin dailong$ pwd
/work/elasticsearch/logstash-7.1.0/bin
e24-6:bin dailong$ ./logstash -f /work/elasticsearch/logstash-7.1.0/config/logstash-omc.conf
Sending Logstash logs to /work/elasticsearch/logstash-7.1.0/logs which is now configured via log4j2.properties
[2020-01-16T16:30:18,257][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-01-16T16:30:18,285][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.1.0"}
[2020-01-16T16:30:27,887][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2020-01-16T16:30:28,175][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2020-01-16T16:30:28,250][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2020-01-16T16:30:28,254][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-01-16T16:30:28,292][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2020-01-16T16:30:28,299][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2020-01-16T16:30:28,464][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-01-16T16:30:28,634][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x1234b969 run>"}
[2020-01-16T16:30:29,188][INFO ][logstash.inputs.file     ] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"/work/elasticsearch/logstash-7.1.0/data/plugins/inputs/file/.sincedb_20b96030a2ca04be6e9a04f4363503a2", :path=>["/work/docker/bandDev.log"]}
[2020-01-16T16:30:29,246][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
[2020-01-16T16:30:29,343][INFO ][filewatch.observingtail  ] START, creating Discoverer, Watch with file and sincedb collections
[2020-01-16T16:30:29,352][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-01-16T16:30:29,992][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

logstash就是类似于咱的Listener,一个监听器,监听这个文件的变化,当变化了会根据咱之前写的那个配置文件的格式往es写数据。

结果

image.png

开发工具

image.png

参考链接

相关文章

  • Logstash 处理业务日志

    业务场景 最近在做CA卡相关业务处理,用户量700w,对于相关操作日志,按照传统的方式显然不合理的,出现问题排查也...

  • SpringBoot--实战开发--Logstash(六十二)

    一、Logstash简介   Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日...

  • 使用 elk 处理 docker 日志

    一、在 docker-compose 文件中配置日志输出到 logstash 二、 logstash 配置 处理后...

  • 配置logstash从kafka读json格式日志输入es

    logstash logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行...

  • 监控

    监控的维度: 前端 业务 应用层(日志:logstash/kafaka/es/Kibana。调用链:cat/zip...

  • 日志的收集,处理和储存

    Logstash负责日志的收集,处理和储存: 性能:Logstash 致命的问题是它的性能以及资源消耗(默认的堆大...

  • ELK --- 配置Nginx访问日志

    说明 因为nginx可以自定义访问日志,而logstash处理json格式日志比较方便,所以可以先将nginx访问...

  • Logstash采集Nginx日志写入ES

    Logstash采集Nginx日志方式 RPM安装Logstash采集Nginx日志 Docker安装Logsta...

  • elk分析nginx日志

    日志格式示例 nginx日志示例 配置logstash logstash 配置文件 添加pattern_dir ...

  • 流量回放原理

    流量回放是通过日志采集再处理来获取所需要的数据 日志采集 通过logstash工具从应用容器拉取日志信息,可以根据...

网友评论

      本文标题:Logstash 处理业务日志

      本文链接:https://www.haomeiwen.com/subject/txwfzctx.html