美文网首页
ELK部署小结

ELK部署小结

作者: dshowing | 来源:发表于2019-03-12 20:04 被阅读0次

    采用ELK+Kafka+filebeat方式部署,非规模系统,所以仅仅进行了单点部署,没有搭建ES/KAFKA集群;filebeat和kafka部署于公网,ELK节点于局域网对kafka数据进行拉取

    环境说明

    filebeat

    • 收集服务日志(json/plant)
    • Ubuntu14.04
    • filebeat-6.2.3

    kafka

    • kafka_2.12-2.1.0
    • centos7.6.1810
    • kafka-manager-1.3.3.22

    ELK

    • ubuntu 16.04.1
    • kibana 5.6.12
    • elasticsearch 5.6.12
    • logstash 5.6.12

    架构设计

    公网

    • filebeat采集节点传输数据到KAFKA节点
    • 位于公网的kafka接受生产者filebeat发来的数据

    局域网

    • 内网搭建ELK节点,通过logstash从公网kafka节点消费数据

    Kafka

    kafka是一个高吞吐量的中间消息队列,常用来做大数据相关的架构。同时,它的高吞吐量也带来对带宽的高需求,包括稳定性和传输速度(这里埋下了伏笔,以后会提到)

    说一下本次部署采用kafka的思路:

    • 蜜罐数据展示,最方便的是kibana
    • elk节点一方面因为吃配置,一方面为了数据安全,一定要放在本地
    • 公网数据回传本地,而且公网数据节点非常分散
    • 所以先收集数据到一个中心节点(kafka),而后发挥kafka的消息队列优势,一起回传到本地ELK进行展示分析

    安装

    kafka在新版本以后,内部已经集成了zookeeper模块,故而直接下载kafka源码包就可以了

    wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.12-2.1.0.tgz
    tar xvf kafka_2.12-2.1.0.tgz
    

    查看kafka目录

    [root@iZj6c5bdyyg7se9hbjaakuZ kafka2.12-2.1.0]# ls
    bin  config  libs  LICENSE  logs  NOTICE  site-docs
    [root@iZj6c5bdyyg7se9hbjaakuZ kafka2.12-2.1.0]# ls ./bin/
    connect-distributed.sh        kafka-console-producer.sh    kafka-log-dirs.sh                    kafka-run-class.sh                  kafka-verifiable-producer.sh     zookeeper-shell.sh
    connect-standalone.sh         kafka-consumer-groups.sh     kafka-mirror-maker.sh                kafka-server-start.sh               trogdor.sh
    kafka-acls.sh                 kafka-consumer-perf-test.sh  kafka-preferred-replica-election.sh  kafka-server-stop.sh                windows
    kafka-broker-api-versions.sh  kafka-delegation-tokens.sh   kafka-producer-perf-test.sh          kafka-streams-application-reset.sh  zookeeper-security-migration.sh
    kafka-configs.sh              kafka-delete-records.sh      kafka-reassign-partitions.sh         kafka-topics.sh                     zookeeper-server-start.sh
    kafka-console-consumer.sh     kafka-dump-log.sh            kafka-replica-verification.sh        kafka-verifiable-consumer.sh        zookeeper-server-stop.sh
    [root@iZj6c5bdyyg7se9hbjaakuZ kafka2.12-2.1.0]# ls ./config/
    connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties     trogdor.conf
    connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties       zookeeper.properties
    connect-distributed.properties     connect-log4j.properties        log4j.properties               tools-log4j.properties
    [root@iZj6c5bdyyg7se9hbjaakuZ kafka2.12-2.1.0]# 
    

    因为zookeeper和Kafka配置文件里写的都是主机名,因此配置之前,先修改hosts主机文件

    127.0.0.1       localhost       localhost.localdomain   localhost4      localhost4.localdomain4
    ::1             localhost       localhost.localdomain   localhost6      localhost6.localdomain6
    0.0.0.0         iZj6c5bdyyg7se9hbjaakuZ
    

    zookeeper配置文件

    # zookeeper数据目录,主要存放三种数据:broker、topic、productions
    dataDir=/opt/datas/zookeeper
    # 默认端口
    clientPort=2181
    maxClientCnxns=0
    

    kafka配置文件

    # 随意正整数,如果是集群,注意id唯一
    broker.id=1
    # kafka链接地址,注意使用主机名
    listeners=PLAINTEXT://iZj6c5bdyyg7se9hbjaakuZ:9092
    advertised.listeners=PLAINTEXT://iZj6c5bdyyg7se9hbjaakuZ:9092
    # 性能参数
    num.network.threads=5
    num.io.threads=10
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    # 数据目录,kafka数据以文件形式存储
    # 一个分片一个文件夹,多个分片多个文件夹;此外,zookeeper数据中会记录kafka消息消费的offset
    log.dirs=/opt/datas/kafkalogs
    # 性能参数
    num.partitions=3
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    # 指定zookeeper连接,因为是同一台机器,故配置相同的主机名,默认2181端口
    zookeeper.connect=localhost:2181
    advertised.host.name=iZj6c5bdyyg7se9hbjaakuZ
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    

    注意:kafka和zookeeper有先后启动顺序,先zookeeper后kafka;关闭服务时先kafka后zookeeper,不然数据会出问题
    启动服务

    # 启动zookeeper
    [root@iZj6c5bdyyg7se9hbjaakuZ kafka2.12-2.1.0]# cd /opt/kafka2.12-2.1.0/bin/
    [root@iZj6c5bdyyg7se9hbjaakuZ bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
    # 启动kafka
    [root@iZj6c5bdyyg7se9hbjaakuZ bin]# ./kafka-server-start.sh -daemon ../config/server.properties
    # 查看进程,第一个是zookeeper,第二个是kafka
    [root@iZj6c5bdyyg7se9hbjaakuZ bin]# jps
    653033 QuorumPeerMain
    661787 Kafka
    661826 Jps
    

    创建消息主题
    两个主题,分别对应栾世杰的cowriefanuc数据

    在这里创建了两个主题,都是一个副本,8个分区。副本数不能大于broker数,因为是单机部署,故副本数最大为1;多个分区是为了并行消费,水平扩展,这里设置为8个分区。分区太多也会导致数据同步过程中出现延迟,所以并不是越大越好。如果是大规模部署,多副本多分区情况,可以考虑配置producer的ACK异步,可以节省大量时间。

    producer 的deliver guarantee可以通过request.required.acks参数的设置来进行调整:

    • 0,相当于异步发送,消息发送完毕即offset增加,继续生产;相当于At most once
    • 1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产
    • -1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产
    # 创建两个主题 //指定zookeeper集群;指定副本数;指定分区数;指定主题名称
    ./bin/kafka-topics.sh --create --zookeeper iZj6c5bdyyg7se9hbjaakuZ:2181 --replication-factor 1 --partitions 8 --topic lsj_fanuc
    ./bin/kafka-topics.sh --create --zookeeper iZj6c5bdyyg7se9hbjaakuZ:2181 --replication-factor 1 --partitions 8 --topic lsj_cowrie
    # 查看主题
    ./bin/kafka-topics.sh --zookeeper iZj6c5bdyyg7se9hbjaakuZ:2181 --list
    

    查看消费数据,此时没有数据,如果有数据会快速刷新到控制台,Ctrl + C退出

    # 消费者脚本,查看消费者数据  //从最开始查看(消费)
    ./kafka-console-consumer.sh --bootstrap-server  iZj6c5bdyyg7se9hbjaakuZ:9092 --topic lsj_fanuc --from-beginning
    ./kafka-console-consumer.sh --bootstrap-server  iZj6c5bdyyg7se9hbjaakuZ:9092 --topic lsj_cowrie --from-beginning
    

    查看端口:2181/9092

    [root@iZj6c5bdyyg7se9hbjaakuZ ~]# netstat -antup 
    Active Internet connections (servers and established)
    Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
    tcp        0      0 0.0.0.0:39944           0.0.0.0:*               LISTEN      661787/java         
    tcp        0      0 0.0.0.0:40461           0.0.0.0:*               LISTEN      653033/java         
    tcp        0      0 0.0.0.0:2048            0.0.0.0:*               LISTEN      3167/sshd           
    tcp        0      0 0.0.0.0:9092            0.0.0.0:*               LISTEN      661787/java         
    tcp        0      0 0.0.0.0:2181            0.0.0.0:*               LISTEN      653033/java         
    tcp        0      0 127.0.0.1:2181          127.0.0.1:54480         ESTABLISHED 653033/java         
    tcp        0      0 172.31.62.108:36518     100.100.30.25:80        ESTABLISHED 2920/AliYunDun      
    tcp        0      0 127.0.0.1:54480         127.0.0.1:2181          ESTABLISHED 661787/java         
    tcp        0    120 172.31.62.108:2048      111.193.52.213:34419    ESTABLISHED 662776/sshd: root@p 
    tcp        0      0 172.31.62.108:2048      114.249.23.35:32569     ESTABLISHED 661344/sshd: root@p 
    tcp        1      0 127.0.0.1:46822         127.0.0.1:9092          CLOSE_WAIT  661787/java         
    udp        0      0 0.0.0.0:68              0.0.0.0:*                           2723/dhclient       
    udp        0      0 127.0.0.1:323           0.0.0.0:*                           1801/chronyd        
    udp6       0      0 ::1:323                 :::*                                1801/chronyd        
    [root@iZj6c5bdyyg7se9hbjaakuZ ~]# 
    

    至此,kafka端(阿里云)已配置完成

    其他相关命令

    调整分区数

    [root@iZj6c5bdyyg7se9hbjaakuZ bin]# ./kafka-topics.sh --zookeeper iZj6c5bdyyg7se9hbjaakuZ:2181 --alter --topic lsj_fanuc --partitions 8
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    [root@iZj6c5bdyyg7se9hbjaakuZ bin]# ./kafka-topics.sh --zookeeper iZj6c5bdyyg7se9hbjaakuZ:2181 --alter --topic lsj_cowrie --partitions 8
    OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    

    删除主题(删除之后数据还在,只是标记为不可用;若想彻底删除,进入到配置文件的data目录中删除)

    [root@iZj6c5bdyyg7se9hbjaakuZ bin]# ./kafka-topics.sh --delete --zookeeper iZj6c5bdyyg7se9hbjaakuZ:2181  --topic  mysql_topic 
    

    查看kafka特定topic的详情

    [root@iZj6c5bdyyg7se9hbjaakuZ bin]# ./kafka-topics.sh --zookeeper iZj6c5bdyyg7se9hbjaakuZ:2181 --topic lsj_fanuc --describe
    

    kafka相关报错处理

    filebeat

    filebeat是一个轻量化的消息采集工具,相比于logstash和其他工具,不需要依赖Java环境,采用C语言编写,可通过Ansible推送脚本自动安装到各个蜜罐节点。

    下载

    同属于elastic公司产品,可在官网下载

    wget wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-linux-x86_64.tar.gz
    tar xvf /filebeat-6.2.3-linux-x86_64.tar.gz
    

    配置运行

    vim filebeat.yml

    filebeat.prospectors:
    - type: log  //通常情况typt字段均为log
      enabled: true   //启用
      paths:
        - /home/cowrie/cowrie/log/cowrie*
      fields:    //指定流入主题
        log_topics: lsj_cowrie
    
    - type: log
      enabled: true
      paths:
        - /root/fanucdocker_session/network_coding/log/mylog_*
      fields:
        log_topics: lsj_fanuc
    
    
    output.kafka:
      enabled: true
      hosts: ["47.244.139.92:9092"]
      topic: '%{[fields][log_topics]}'
    
    

    启动

    # 前台运行
    /root/filebeat-6.2.3/filebeat -e -c /root/filebeat-6.2.3/filebeat.yml
    # 后台运行
    nohup /root/filebeat-6.2.3/filebeat -e -c /root/filebeat-6.2.3/filebeat.yml >/dev/null 2>&1 &
    

    ELK

    elk均采用5.6.12版本

    elasticsearch

    下载ES

    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.12.tar.gz
    tar xvf elasticsearch-5.6.12.tar.gz
    

    配置文件vim elasticsearch.yml

    # 集群用户名
    cluster.name: lsj_elk
    path.logs: /var/log/elk/elastic
    network.host: 10.10.2.109
    http.port: 9200
    # 安装ES-Head插件需要配置的参数
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    

    启动

    # 前台
    ./bin/elasticsearch
    # 后台
    ./bin/elasticsearch -d
    

    kibana

    apt-get安装

    root@elk:/opt/elk# apt list | grep kibana 
    
    WARNING: apt does not have a stable CLI interface. Use with caution in scripts.
    
    kibana/now 5.6.12 amd64 [installed,local]
    root@elk:/opt/elk# 
    

    配置文件vim /etc/kibana/kibana.yml

    server.port: 5601
    server.host: "10.10.2.109"
    server.name: "elk_lsj"
    elasticsearch.url: "http://10.10.2.109:9200"
    

    启动

    systemctl start kibana
    systemctl enable kibana
    

    查看端口

    root@elk:/opt/elk# netstat -antup | grep 5601
    tcp        0      0 10.10.2.109:5601        0.0.0.0:*               LISTEN      17774/node      
    root@elk:/opt/elk# 
    

    logstash

    下载logstash

    wget https://artifacts.elastic.co/downloads/logstash/logstash-5.6.12.tar.gz
    tar xvf logstash-5.6.12.tar.gz
    

    lgstash配置文件logstash.yml不需要配置,只需要根据输出要求配置input_output.conf即可:
    下面是采用kafka传入的配置文件vim logstash-5.6.12/config/kafka_logstash.conf

    ###  Input段,负责数据流入控制
    # 这里虽然只有一个kafka,但cowrie和fanuc日志格式不同,故分开来处理,并一定要用client_id唯一字段来区分
    input {
        kafka {
            bootstrap_servers => "47.244.139.92:9092"  //kafka服务,严格说是kafka集群的leader节点,由于是单机kafka,故只有这个IP
            client_id => "test_cowrie"   //多输入流情况下配置的唯一ID
            group_id => "test"    //输入流分组
            topics => ["lsj_cowrie"]    //数据来源主题,可以配置多个,逗号分隔
            type => "Cowrie"    //logstash.input中的`type`字段,便于下面日志的分开解析
            consumer_threads => 8    //logstash并行消费数据,通常情况下对应kafka集群的分区数量,这里设置为8个分区
            }
        kafka {
            bootstrap_servers => "47.244.139.92:9092"
            client_id => "test_fanuc"
            group_id => "test"
            topics => ["lsj_fanuc"]
            type => "Fanuc"
            consumer_threads => 8
        }
    }
    
    ### Filter段,负责数据格式解析
    filter {
            if [type] == "Cowrie" {
                    # cowrie为json日志,直接使用json解析模块即可自动解析
                    json {
                            source => "message" 
                            remove_field => ["message"] 
                            # 默认情况,解析出来的json会添加message字段,将整条数据加进去,这个没有必要,因而要去掉
                    }
                    # json解析完成后,对src_ip字段进行定位,采用Geoip离线数据库
                    if [src_ip] {
                            geoip {
                                    source => "src_ip"
                                    target => "geoip"
                                    database => "/opt/elk/GeoLite2-City/GeoLite2-City.mmdb"
                                    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                                    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                            }
                            mutate {
                                    convert => [ "[geoip][coordinates]", "float" ]
                            }
                    }
            }
    
            # Fanuc日志为plant格式,这里要用到grok解析模块
            if [type] == "Fanuc" {
            # 根据日志多种组合情况,配置多条匹配,使用logstash.filter正则匹配
            grok {
                    match => { "message" => "Connection from %{IP:src_ip}:%{BASE10NUM:src_port} closed." }
                    add_tag => [ "fanuc_connection_closed" ]
                    tag_on_failure => []
            }
            grok {
                    match => { "message" => "Accept new connection from %{IP:src_ip}:%{BASE10NUM:src_port}" }
                    add_tag => [ "fanuc_new_connection" ]
                    tag_on_failure => []
            }
            grok {
                    match => { "message" => "function:%{URIPATHPARAM:funcCode}" }
                    add_tag => [ "fanuc_FunctionCode" ]
                    tag_on_failure => []
            }
            grok {
                    match => { "message" => "request: %{BASE16NUM:request}" }
                    add_tag => [ "fanuc_request" ]
                    tag_on_failure => []
            }
            grok {
                    match => { "message" => "results: %{BASE16NUM:results}" }
                    add_tag => [ "fanuc_results" ]
                    tag_on_failure => []
            }
                    # 定位src_ip字段
                    if [src_ip] {
                    geoip {
                            source => "src_ip"
                            target => "geoip"
                            database => "/opt/elk/GeoLite2-City/GeoLite2-City.mmdb"
                            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                    }
                    mutate {
                            convert => [ "[geoip][coordinates]", "float" ]
                    }
            }
        }
    }
    
    ### Output段,负责数据流出控制
    output {
            elasticsearch{
                    hosts => ["10.10.2.109:9200"]
                    index => "logstash_lsj_elk"
                    timeout => 300
            }
    }
    

    关于logstash日志解析

    Logstash利用GeoIP库进行定位

    Logstash处理json格式日志文件有三种方法:

    • input直接设置format => json
    • filter使用codec => json
    • 使用filter json插件

    启动logstash

    root@elk:/opt/elk/logstash-5.6.12# pwd
    /opt/elk/logstash-5.6.12
    root@elk:/opt/elk/logstash-5.6.12# ./bin/logstash -f ./config/kafka_logstash.conf 
    # 后台运行(也可以使用supervisor)
    root@elk:/opt/elk/logstash-5.6.12# nohup ./bin/logstash -f ./config/kafka_logstash.conf >/dev/null 2>&1 &
    

    报错&&思考

    前前后后拖了两周,,周三终于找到了原因---实验室带宽不够,kafka带宽胃口太大了,,

    所以就导致:

    • logstash从kafka取数据消费
    • 由于带宽问题,logstash无论如何都不能在规定时间内消费完毕
    • 这时候kafka认为logstash已经离线了,就主动断开连接,,,
    • logstash再次向kafka请求数据,周而复始,没有数据流入,,

    而后我回到寝室,使用小区带宽,发现阿里云的带宽仅仅有1Mbp,也就是200kb左右的速率,那就凉凉了

    (2019/03/22)考虑到阿里云购买了一年,不能浪费,目前有两种办法可行:

    • 取消kafka,转而使用redis做消息队列
    • 不使用消息中间件,直接将本地ELK的logstash端口代理到阿里云上的某一端口

    Redis方案

    Todo

    logstash端口转发方案

    Todo

    Kafka python API

    相关文章

      网友评论

          本文标题:ELK部署小结

          本文链接:https://www.haomeiwen.com/subject/nphopqtx.html