美文网首页
大型网络日志中心的实现

大型网络日志中心的实现

作者: wsf535 | 来源:发表于2018-03-20 18:13 被阅读112次

    用elk建立网络日志中心实践,在前面的文章中简单的用了两台设备对elk进行简单的网络日志中心实践,要建立更大规模的日志中心是不够的,以下方案优化了各环境,实现大型网络中可用的日志中心。
    架构拓扑如下:

    ELK环境的搭建看前面文章用elk建立网络日志中心实践

    zookeeper

    安装:
    Hosts 修改
    [root@10-57-22-234 bin]# cat /etc/hosts
    10.57.22.167 zk1
    10.57.22.218 zk2
    10.57.22.234 zk3

    Java
    yum install java -y

    wget http://apache.fayea.com/zookeeper/stable/zookeeper-3.4.10.tar.gz

    tar -zxvf zookeeper-3.4.10.tar.gz
    mv zookeeper-3.4.10 /usr/local/zookeeper
    cd /usr/local/zookeeper/conf/
    cp zoo_sample.cfg zoo.cfg

    vi zoo.cfg
    [root@10-57-22-234 bin]# cat /usr/local/zookeeper/conf/zoo.cfg |grep -v ^'#'
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data/zk
    clientPort=2181
    server.0=zk1:2888:3888
    server.1=zk2:2888:3888
    server.2=zk3:2888:3888

    mkdir -p /data/zk
    生成ID文件很重要,三个文件的ID分别为0 1 2,如果不做会启动失败,报 Invalid config, exiting abnormally
    touch /data/zk/myid
    echo 0 >/data/zk/myid

    启动
    ./zkServer.sh start

    查看
    [root@10-57-22-234 bin]# ./zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
    Mode: follower

    报错检查

    1、防火墙是否已经关
    2、配置文件用主机名,查看主机名能否解析,查看hosts文件
    3、dataDir目录需要手工建,并且需要创建myid文件,并在文件中写入不同的值。

    kafka

    下载:
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.12-1.0.1.tgz
    tar -zxvf kafka_2.12-1.0.1.tgz
    mv kafka_2.12-1.0.1 /usr/local/kafka
    增加环境变量
    vi /etc/profile
    export KAFKA_HOME=/usr/local/kafka
    export PATH=/usr/local/kafka/bin:$PATH
    刷新环境变量
    source /etc/profile

    启动脚本:新版本的kafka自带zk我zk已经装好,就把zk的启动脚本关了。
    chmod + x kafka
    启动 /etc/init.d/kafka start
    停止 /etc/init.d/kafka stop
    [root@10-57-22-218 config]# cat /etc/init.d/kafka

    #!/bin/bash  
      
    kafka_home=/usr/local/kafka
      
    case $1 in   
       start)  # 服务启动需要做的步骤  
    #           echo "zookeeper start"  
    #           $kafka_home/bin/zookeeper-server-start.sh -daemon $kafka_home/config/zookeeper.properties  
               #sleep 1  
               echo "kafka start"  
               $kafka_home/bin/kafka-server-start.sh -daemon $kafka_home/config/server.properties  
               #sleep 1  
               ;;  
       stop)   # 服务停止需要做的步骤  
               echo "kafka stop"  
               $kafka_home/bin/kafka-server-stop.sh  
               #sleep 1  
    #           echo "zookeeper stop"  
    #           $kafka_home/bin/zookeeper-server-stop.sh  
               #sleep 1  
               ;;  
       restart) # 重启服务需要做的步骤  
                ...  
               ;;  
       status) # 查看状态需要做的步骤  
                 ...  
               ;;  
       *) echo "$0 {start|stop|restart|status}"  
               exit 4  
               ;;  
    esac  
    

    测试使用:
    1、创建主题
    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    2、查看主题
    kafka-topics.sh --list --zookeeper localhost:2181
    查看主题详情:
    kafka-topics.sh --describe --zookeeper zk1:2181 --topic test

    3、开启一个终端,发送消息,生产者的消息要发往kafka
    kafka-console-producer.sh --broker-list localhost:9092 --topic test

    4、另起一个终端,消费消息,消费者的消息来自zookeeper(协调转发)
    kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    报错处理:

    [2018-03-20 02:14:20,347] WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
    
    

    原因集群环境执行需要加上所有IP,如下:
    kafka-console-producer.sh --broker-list zk1:9092,zk2:9092,zk3:9092 --topic test

    三台kafka上logstash配置文件,新版本的logstash已经不支持以下的配置

    input {
        kafka {
            zk_connect => "zk1:2181,zk2:2181,zk3:2181"
            topic_id => "networklog"
            codec => plain
            reset_beginning => false
            consumer_threads => 5
            decorate_events => true
        }
    }
    output {
        elasticsearch { hosts => ["10.57.22.126:9200","10.57.22.128:9200"] }
    }
    

    报错

    [2018-03-20T02:38:00,052][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.2"}
    [2018-03-20T02:38:00,144][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    [2018-03-20T02:38:00,487][ERROR][logstash.inputs.kafka    ] Unknown setting 'zk_connect' for kafka
    [2018-03-20T02:38:00,488][ERROR][logstash.inputs.kafka    ] Unknown setting 'topic_id' for kafka
    [2018-03-20T02:38:00,488][ERROR][logstash.inputs.kafka    ] Unknown setting 'reset_beginning' for kafka
    

    需要修改成以下,同时注意端口变成9092kafka不再是zk

    input {
        kafka {
            bootstrap_servers => "zk1:9092,zk2:9092,zk3:9092"
            topics => ["networklog"]
        }
    }
    output {
        elasticsearch { hosts => ["10.57.22.126:9200","10.57.22.128:9200"] }
    }
    

    10.57.22.95 的logstash配置文件

    input {
        tcp {
            port => 514
            type => syslog
        }
        udp {
            port => 514
            type => syslog
        }
    }
    output {
       kafka {
              bootstrap_servers =>"10.57.22.167:9092,10.57.22.234:9092,10.57.22.218:9092"
              topic_id => "networklog"
              compression_type => "snappy"
              }
    }
    

    相关文章

      网友评论

          本文标题:大型网络日志中心的实现

          本文链接:https://www.haomeiwen.com/subject/vcpvqftx.html