美文网首页
使用redis,kafka做为缓存来收集日志

使用redis,kafka做为缓存来收集日志

作者: 唯爱熊 | 来源:发表于2020-02-18 22:10 被阅读0次

    filebeat使用redis作为缓存

    1.前提条件

    • filebeat不支持传输给redis哨兵或集群
    • logstash也不支持从redis哨兵或集群里读取数据

    2.安装配置redis

    yum install redis -y
    sed -i 's#^bind 127.0.0.1#bind 127.0.0.1 10.0.0.51#' /etc/redis.conf
    systemctl start redis
    

    3.安装配置nginx
    配置官方源

    yum install nginx -y
    

    放在nginx.conf最后一行的}后面,不要放在conf.d里面

    stream {
      upstream redis {
          server 10.0.0.51:6379 max_fails=2 fail_timeout=10s;
          server 10.0.0.52:6379 max_fails=2 fail_timeout=10s backup;
      }
      
      server {
              listen 6380;
              proxy_connect_timeout 1s;
              proxy_timeout 3s;
              proxy_pass redis;
      }
    }
    nginx -t
    systemctl start nginx 
    

    4.安装配置keepalived

    yum install keepalived -y
    db01的配置
    cat>/etc/keepalived/keepalived.conf<<EOF
    global_defs {
        router_id db01
    }
    vrrp_instance VI_1 {
        state MASTER
            interface eth0
            virtual_router_id 50
            priority 150
            advert_int 1
            authentication {
                auth_type PASS
                auth_pass 1111
            }
            virtual_ipaddress {
                10.0.0.100
            }
    }
    EOF
    

    db02的配置:

    cat>/etc/keepalived/keepalived.conf<<EOF
    global_defs {
        router_id db02
    }
    vrrp_instance VI_1 {
        state BACKUP
        interface eth0
        virtual_router_id 50
        priority 100
        advert_int 1
        authentication {
            auth_type PASS
            auth_pass 1111
        }
        virtual_ipaddress {
            10.0.0.100
        }
    }
    EOF
    systemctl start keepalived 
    ip a
    

    5.测试访问能否代理到redis

    redis-cli -h 10.0.0.100 -p 6380
    把db01的redis停掉,测试还能不能连接redis
    

    6.配置filebeat

    cat >/etc/filebeat/filebeat.yml <<EOF
    filebeat.inputs:
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/access.log
      json.keys_under_root: true
      json.overwrite_keys: true
      tags: ["access"]
    
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/error.log
      tags: ["error"]
    
    output.redis:
      hosts: ["10.0.0.100:6380"]
      keys:
        - key: "nginx_access"
          when.contains:
            tags: "access"
        - key: "nginx_error"
          when.contains:
            tags: "error"
    
    setup.template.name: "nginx"
    setup.template.pattern: "nginx_*"
    setup.template.enabled: false
    setup.template.overwrite: true
    EOF
    

    7.测试访问filebeat能否传输到redis
    curl 10.0.0.51/haha
    redis-cli -h 10.0.0.51 #应该有数据
    redis-cli -h 10.0.0.52 #应该没数据
    redis-cli -h 10.0.0.100 -p 6380 #应该有数据

    8.配置logstash

    cat >/etc/logstash/conf.d/redis.conf<<EOF 
    input {
      redis {
        host => "10.0.0.100"
        port => "6380"
        db => "0"
        key => "nginx_access"
        data_type => "list"
      }
      redis {
        host => "10.0.0.100"
        port => "6380"
        db => "0"
        key => "nginx_error"
        data_type => "list"
      }
    }
    
    filter {
      mutate {
        convert => ["upstream_time", "float"]
        convert => ["request_time", "float"]
      }
    }
    
    output {
       stdout {}
       if "access" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_access-%{+yyyy.MM}"
          }
        }
        if "error" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_error-%{+yyyy.MM}"
          }
        }
    }
    EOF
    

    9.启动测试

    /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf
    

    10.关闭前台运行,使用后台运行

    ctr+c #结束前台运行,使用后台运行
    systemctl start logstash
    

    11.最终测试
    ab -n 10000 -c 100 10.0.0.100/
    检查es-head上索引条目是否为10000条
    关闭db01的redis,在访问,测试logstash正不正常
    恢复db01的redis,再测试

    filbeat引入redis优化方案

    1.新增加一个日志路径需要修改4个地方:

    • filebat 2个位置
    • logstash 2个位置

    2.优化之后需要修改的地方2个地方

    • filebat 1个位置
    • logstash 1个位置

    3.filebeat配置文件

    vim /etc/filebeat/filebeat.yml
    filebeat.inputs:
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/access.log
      json.keys_under_root: true
      json.overwrite_keys: true
      tags: ["access"]
    
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/error.log
      tags: ["error"]
    
    
    output.redis:
      hosts: ["10.0.0.100:6380"]
      key: "nginx_log"
    
    setup.template.name: "nginx"
    setup.template.pattern: "nginx_*"
    setup.template.enabled: false
    setup.template.overwrite: true
    

    4.优化后的logstash

    vim /etc/logstash/conf.d/redis.conf
    input {
      redis {
        host => "10.0.0.100"
        port => "6380"
        db => "0"
        key => "nginx_log"
        data_type => "list"
      }
    }
    
    filter {
      mutate {
        convert => ["upstream_time", "float"]
        convert => ["request_time", "float"]
      }
    }
    
    output {
       stdout {}
       if "access" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_access-%{+yyyy.MM}"
          }
        }
        if "error" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_error-%{+yyyy.MM}"
          }
        }
    }
    
    

    使用kafka作为缓存

    1.配置hosts

    10.0.0.51 kafka51
    10.0.0.52 kafka52
    10.0.0.53 kafka53
    

    2.安装配置zookeeper

    cd /data/soft/
    tar zxf zookeeper-3.4.11.tar.gz -C /opt/
    ln -s /opt/zookeeper-3.4.11/ /opt/zookeeper                   
    mkdir -p /data/zookeeper
    cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
    cat >/opt/zookeeper/conf/zoo.cfg<<EOF
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data/zookeeper
    clientPort=2181
    server.1=10.0.0.51:2888:3888
    server.2=10.0.0.52:2888:3888
    server.3=10.0.0.53:2888:3888 
    EOF
    #每台机器不一样
    echo "1" > /data/zookeeper/myid
    cat /data/zookeeper/myid
    

    3.启动zookeeper
    所有节点都启动

    /opt/zookeeper/bin/zkServer.sh start
    

    4.每个节点都检查

    /opt/zookeeper/bin/zkServer.sh status
    

    5.测试zookeeper

    在一个节点上执行,创建一个频道

    /opt/zookeeper/bin/zkCli.sh -server 10.0.0.51:2181
    create /test "hello"
    

    在其他节点上看能否接收到

    /opt/zookeeper/bin/zkCli.sh -server 10.0.0.52:2181
    get /test
    

    6.安装部署kafka
    kafka51操作:

    cd /data/soft/
    tar zxf kafka_2.11-1.0.0.tgz -C /opt/
    ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
    mkdir /opt/kafka/logs
    cat >/opt/kafka/config/server.properties<<EOF
    broker.id=1
    listeners=PLAINTEXT://10.0.0.51:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=24
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    EOF 
    

    kafka52操作:

    cd /data/soft/
    tar zxf kafka_2.11-1.0.0.tgz -C /opt/
    ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
    mkdir /opt/kafka/logs
    cat >/opt/kafka/config/server.properties<<EOF
    broker.id=2
    listeners=PLAINTEXT://10.0.0.52:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=24
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    EOF
    

    kafka53操作:

    cd /data/soft/
    tar zxf kafka_2.11-1.0.0.tgz -C /opt/
    ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
    mkdir /opt/kafka/logs
    cat >/opt/kafka/config/server.properties<<EOF
    broker.id=3
    listeners=PLAINTEXT://10.0.0.53:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=24
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    EOF
    

    7.前台启动测试

    #每个节点测试
    [root@kafka51 ~]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    

    8.验证进程

    #每个节点查看
    [root@kafka51 ~]# jps
    17000 Kafka
    15273 QuorumPeerMain
    17084 Jps
    
    

    9.测试创建topic

    /opt/kafka/bin/kafka-topics.sh --create  --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic kafkatest
    

    10.测试获取toppid

    /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest
    

    11.测试删除topic

    /opt/kafka/bin/kafka-topics.sh --delete --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest
    

    12.kafka测试命令发送消息

    #创建命令
    /opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic  messagetest
    #测试发送消息
    /opt/kafka/bin/kafka-console-producer.sh --broker-list  10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 --topic  messagetest
    #其他节点测试接收
    /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic messagetest --from-beginning
    #测试获取所有的频道
    /opt/kafka/bin/kafka-topics.sh  --list --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
    

    13.测试成功之后,可以放在后台启动

    ctr+c  #结束前台运行
    /opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties
    

    14.修改filebeat配置文件

    cat >/etc/filebeat/filebeat.yml <<EOF
    filebeat.inputs:
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/access.log
      json.keys_under_root: true
      json.overwrite_keys: true
      tags: ["access"]
    
    - type: log
      enabled: true 
      paths:
        - /var/log/nginx/error.log
      tags: ["error"]
    
    output.kafka:
      hosts: ["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]
      topic: 'filebeat'
    
    setup.template.name: "nginx"
    setup.template.pattern: "nginx_*"
    setup.template.enabled: false
    setup.template.overwrite: true
    
    EOF
    

    15.修改logstash配置文件

    cat >/etc/logstash/conf.d/kafka.conf <<EOF
    input {
      kafka{
        bootstrap_servers=>["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]
        topics=>["filebeat"]
        group_id=>"logstash"
        codec => "json"
      }
    }
    
    filter {
      mutate {
        convert => ["upstream_time", "float"]
        convert => ["request_time", "float"]
      }
    }
    
    output {
       stdout {}
       if "access" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_access-%{+yyyy.MM}"
          }
        }
        if "error" in [tags] {
          elasticsearch {
            hosts => "http://10.0.0.51:9200"
            manage_template => false
            index => "nginx_error-%{+yyyy.MM}"
          }
        }
    }
    EOF
    

    16.启动logstash并测试
    1.前台启动

    /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf
    

    2.后台启动

    #ctr +c 结束之前的前台启动
    systemctl start logstash
    

    17.集群考可用测试
    结果:任意一台或两台服务器坏了zookpeer或者kafka或者全坏了或者随机坏了,只要剩一个zookper和kafka都不影正常收集日志。
    18.总结kafka实验
    1.前提条件

    • kafka和zook都是基于java的,所以需要java环境
    • 这俩比较吃资源,内存得够

    2.安装zook注意

    • 每台机器的myid要不一样,而且要和配置文件里的id对应上
    • 启动测试,角色为leader和follower
    • 测试发送和接受消息

    3.安装kafka注意

    • kafka依赖于zook,所以如果zook不正常,kafka不能工作
    • kafka配置文件里要配上zook的所有IP的列表
    • kafka配置文件里要注意,写自己的IP地址
    • kafka配置文件里要注意,自己的ID是zook里配置的myid
    • kafka启动要看日志出现started才算是成功

    4.测试zook和kafka

    • 一端发送消息
    • 两端能实时接收消息

    5.配置filebeat

    • output要配上kafka的所有的IP列表

    6.配置logstash

    • input要写上所有的kafka的IP列表,别忘了[]
    • 前台启动测试成功后再后台启动

    相关文章

      网友评论

          本文标题:使用redis,kafka做为缓存来收集日志

          本文链接:https://www.haomeiwen.com/subject/fizifhtx.html