Filebeat、Logstash、Kafka 三步曲

作者: 大猪大猪 | 来源:发表于2018-09-02 23:37 被阅读66次

    Filebeat > Logstash > Kafka 使用步骤

    使用filebeat收集日志到logstash中,再由logstash再生产数据到kafka,如果kafka那边没有kerberos认证也可以直接收集到kafka中。

    使用方法

    PS:截至目前时间2018-09-02为止logstash的版本为6.4.0有Integer转Long的Bug,官方说预计会在本月修复,所以这里先降低一下logstash的版本,暂时使用6.2.2

    下载logstash

    cd /usr/share
    wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.2.tar.gz
    mv logstash-6.2.2 logstash
    

    Filebeat

    安装filebeat

    yum install filebeat
    

    filebeat配置文件 /etc/filebeat/filebeat.yml

    filebeat.inputs:
    - type: log
      enabled: true
      publish_async: true
      paths:
        ## 日志位置
        - /etc/filebeat/logs/*.log
    filebeat.config.modules:
      path: ${path.config}/modules.d/*.yml
      reload.enabled: false
    setup.template.settings:
      index.number_of_shards: 3
    output.logstash:
      hosts: ["127.0.0.1:5044"]
    processors:
     - drop_fields:
         fields: []
    

    Logstash

    安装kafka插件

    /usr/share/logstash/bin/logstash-plugin install logstash-output-kafka
    

    配置logstash

    mkdir /etc/logstash
    

    logstash pipelines文件 /etc/logstash/pipelines.yml

     - pipeline.id: another_test
       queue.type: persisted
       path.config: "/etc/logstash/conf.d/*.conf"
       pipeline.id: mylogs
       pipeline.workers: 8
       pipeline.batch.size: 1000
       pipeline.batch.delay: 50
       pipeline.output.workers: 1
    

    logstash 配置文件 /etc/logstash/logstash.yml

       pipeline:
         batch:
           size: 125
           delay: 5
           
    path.data: /var/lib/logstash
    pipeline.workers: 8
    pipeline.batch.size: 1000
    path.logs: /var/log/logstash
    

    jvm 配置 /etc/logstash/jvm.options

    -Xms10g
    -Xmx10g
    -XX:+UseParNewGC
    -XX:+UseConcMarkSweepGC
    -XX:CMSInitiatingOccupancyFraction=75
    -XX:+UseCMSInitiatingOccupancyOnly
    -Djava.awt.headless=true
    -Dfile.encoding=UTF-8
    -Djruby.compile.invokedynamic=true
    -Djruby.jit.threshold=0
    -XX:+HeapDumpOnOutOfMemoryError
    -Djava.security.egd=file:/dev/urandom
    

    /etc/logstash/conf.d/kafka/kafka_jaas.conf

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      doNotPrompt=true
      useTicketCache=true
      principal="admin/admin@demo.com"
      useKeyTab=true
      serviceName="kafka"
      keyTab="/etc/security/keytabs/admin.keytab"
      client=true;
    };
    

    kafka认证文件 /etc/logstash/conf.d/logstash.conf

    input {
        beats{
            port => 5044
        }
    }
    output{
        kafka {
            topic_id => "test_hello"
            bootstrap_servers => "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092"
            compression_type => "snappy"
            jaas_path => "/etc/logstash/conf.d/kafka/kafka_jaas.conf"
            kerberos_config => "/etc/krb5.conf"
            ##名字一定要跟kafka_jaas.conf里面的serviceName一样
            sasl_kerberos_service_name => "kafka"
            security_protocol => "SASL_PLAINTEXT"
            client_id => "log.demo.com"
        }
    }
    

    运行服务

    运行logstash

    /usr/share/logstash/bin/logstash --path.settings /etc/logstash
    

    运行filebeat

    systemctl start filebeat
    

    日志

    logstash 正常运行日志

    [root@log logstash]# /usr/share/logstash/bin/logstash --path.settings /etc/logstash
    Could not find log4j2 configuration at path /etc/logstash/log4j2.properties. Using default config which logs errors to the console
    [INFO ] 2018-09-02 23:33:27.758 [main] scaffold - Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
    [INFO ] 2018-09-02 23:33:27.772 [main] scaffold - Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
    [INFO ] 2018-09-02 23:33:28.657 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.2.2"}
    [INFO ] 2018-09-02 23:33:29.170 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
    [INFO ] 2018-09-02 23:33:31.075 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] pipeline - Starting pipeline {:pipeline_id=>"mylogs", "pipeline.workers"=>8, "pipeline.batch.size"=>1000, "pipeline.batch.delay"=>50}
    [INFO ] 2018-09-02 23:33:31.199 [[mylogs]-pipeline-manager] ProducerConfig - ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [t2.demo.com:9092, t3.demo.com:9092, t4.demo.com:9092]
        buffer.memory = 33554432
        client.id = log.demo.com
        compression.type = snappy
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 10
        reconnect.backoff.ms = 10
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = kafka
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    
    [INFO ] 2018-09-02 23:33:31.311 [[mylogs]-pipeline-manager] AbstractLogin - Successfully logged in.
    [INFO ] 2018-09-02 23:33:31.315 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT refresh thread started.
    [INFO ] 2018-09-02 23:33:31.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT valid starting at: 2018-09-02T23:33:31.000+0800
    [INFO ] 2018-09-02 23:33:31.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT expires: 2018-09-03T23:33:31.000+0800
    [INFO ] 2018-09-02 23:33:31.317 [kafka-kerberos-refresh-thread-admin/admin@demo.com] KerberosLogin - [Principal=admin/admin@demo.com]: TGT refresh sleeping until: 2018-09-03T19:41:17.691+0800
    [INFO ] 2018-09-02 23:33:31.338 [[mylogs]-pipeline-manager] AppInfoParser - Kafka version : 1.0.0
    [INFO ] 2018-09-02 23:33:31.338 [[mylogs]-pipeline-manager] AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
    [INFO ] 2018-09-02 23:33:31.653 [[mylogs]-pipeline-manager] beats - Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
    [INFO ] 2018-09-02 23:33:31.705 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] pipeline - Pipeline started succesfully {:pipeline_id=>"mylogs", :thread=>"#<Thread:0x49c5d932 run>"}
    [INFO ] 2018-09-02 23:33:31.752 [[mylogs]<beats] Server - Starting server on port: 5044
    [INFO ] 2018-09-02 23:33:31.780 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :pipelines=>["mylogs"]}
    

    相关文章

      网友评论

        本文标题:Filebeat、Logstash、Kafka 三步曲

        本文链接:https://www.haomeiwen.com/subject/ysmvwftx.html