美文网首页
ELFK+Kafka集群搭建

ELFK+Kafka集群搭建

作者: 阿当运维 | 来源:发表于2023-03-21 15:12 被阅读0次

    背景

    为了缓解ES压力,在ELFK架构上引入Kafka作为数据缓冲,提升日志收集效率。

    规划

    • 系统版本为Ubuntu18.04
    • 本次kafka的版本为kafka_2.12-3.4.0.tgz,节点为3个节点
    192.168.2.112  192.168.2.113   192.168.2.114
    
    • zookeeper使用kafka内置的

    整体思路

    • 安装kafka (三台机器)
    • 把kafka内置的zookeeper配置为集群,再配置kafka
    • 简单测试kafka集群生产/消费,并把后面elfk要用的topic创建好
    • 将filebeat配置output配置修改为输出至kafka
    • 将logstash配置input配置修改为从kafka消费数据
    • 日志收集测试

    一.Kafka集群搭建

    以下步骤均三台机器分别执行,注意文件中程序路径

    1.1) 安装kafka

    解压目录并重命名(三台机器上目录分别为/opt/kafka-01,/opt/kafka-02,/opt/kafka-03)

    tar zxvf kafka_2.12-3.4.0.tgz -C /opt/
    cd /opt/ && mv kafka_2.12-3.4.0 kafka-01
    #后面两台一样操作,注意目录名字区分
    

    1.2) 将 kafka 内置的 zookeeper 配置为系统服务

    cat >/lib/systemd/system/zk.service <<EOF
    [Unit]
    Description=Zookeeper in Kafka package
    After=network.target
    
    [Service]
    Type=simple
    WorkingDirectory=/opt/kafka-01
    
    Environment=JAVA_HOME=/opt/jdk1.8.0_144
    
    Restart=on-failure
    LimitNOFILE=100000
    
    ExecStart=/opt/kafka-01/bin/zookeeper-server-start.sh /opt/kafka-01/config/zookeeper.properties
    
    [Install]
    WantedBy=multi-user.target
    EOF
    #开机启动
    systemctl daemon-reload
    systemctl enable zk
    

    1.3) 将 kafka 配置为系统服务

    cat > /lib/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Kafka
    Requires=zk.service
    After=network.target
    After=zk.service
    
    
    [Service]
    Type=simple
    WorkingDirectory=/opt/kafka-03
    
    Environment=JAVA_HOME=/opt/jdk1.8.0_144
    
    Restart=on-failure
    LimitNOFILE=100000
    
    ExecStart=/opt/kafka-03/bin/kafka-server-start.sh /opt/kafka-03/config/server.properties
    
    [Install]
    WantedBy=multi-user.target
    EOF
    #开机启动
    systemctl daemon-reload
    systemctl enable kafka
    

    1.4)修改zookeeper配置文件

    配置数据目录,日志目录,地址配为集群,修改后配置如下:
    vim /opt/kafka-01/config/zookeeper.properties

    clientPort=2181
    admin.enableServer=false
    dataDir=/opt/kafka-01/zk_data_new
    dataLogDir= /opt/kafka-01/zk_log
    tickTime=2000
    initLimit=10
    syncLimit=5
     
    server.1=192.168.2.112:2888:3888
    server.2=192.168.2.113:2888:3888
    server.3=192.168.2.114:2888:3888
    

    1.5) 创建zookeeper所需目录以及创建myid

    上面的配置中server.1 代表192.168.2.112这个节点在zk集群中id为1,需要一个myid文件来存放这个id

    mkdir /opt/kafka-01/zk_data_new zk_log
    echo "1" > /opt/kafka-01/zk_data_new/myid
    #后面两台以此类推,分别id为2,3
    

    以上执行完,zookeeper集群就配置完了,下面配置kafka集群

    1.6) 修改kafka配置文件

    vim /opt/kafka-01/config# cat server.properties (以下的ip地址可配成内网域名解析)修改后为:

    broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.2.112:9092
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=1048576
    socket.receive.buffer.bytes=1048576
    socket.request.max.bytes=125829120
    log.dirs=/opt/kafka-01/kafka-logs 
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=3
    log.retention.hours=168
    log.retention.bytes=1073741824
    log.segment.bytes=209715200
    log.retention.check.interval.ms=300000
    #zookeeper.connect=localhost:2181
    zookeeper.connect=192.168.2.112:2181,192.168.2.113:2181,192.168.2.114:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=3000
    delete.topic.enable=true
    queued.max.requests = 1000
    auto.create.topics.enable=false
    offsets.retention.minutes=43200
    log.cleaner.enable=true
    message.max.bytes=104857600
    log.roll.hours=2
    
    • 参数解读:
    =====================================================================================
    broker.id=1  #这里1是kafka-01的id,每台机器不同,唯一的
    listeners  #监听kafka地址:
    advertised.listeners  #本机对外声明的地址:
    zookeeper.connect  #配置zk集群地址
    log.dirs  #kafka日志目录,可逗号配置多个
    message.max.bytes   #允许的最大消息大小,要小于 socket.request.max.bytes,并且最好和消息生产者最大能产生的消息一致
    =====================================================================================
    num.network.threads 和 num.io.threads
    分别是网络和 IO 的线程数量
    =====================================================================================
    socket.send.buffer.bytes 和 socket.receive.buffer.bytes
    分别是发送缓冲区大小和接收缓冲区大小
    =====================================================================================
    socket.request.max.bytes
    服务器允许请求的最大值,用来防止OOM(应该要大于 message.max.bytes)(为了防止OOM,这个值应该小于JVM 的堆内存)
    =====================================================================================
    num.io.threads
    num.partitions
    创建 topic 时如果没有指定 partition 数量则使用这个值。
    =====================================================================================
    num.recovery.threads.per.data.dir
    在启动时恢复日志和关闭时刷盘日志时每个数据目录的线程的数量,一般磁盘可以不增加,默认为1
    =====================================================================================
    offsets.topic.replication.factor、transaction.state.log.replication.factor 和 transaction.state.log.min.isr
    在生产环境下并且有多个 kafka 节点时建议配置为3,这些是内部 topic 使用副本数量等信息。
    =====================================================================================
    log.retention.hours
    日志(消息)保留时间,单位是小时
    =====================================================================================
    log.retention.bytes
    日志(消息)最大保留多少空间,单位是字节。与 log.retention.hours 同时生效,独立控制。
    这里要注意,这个值原来是限制 partition 使用的大小而不是限制 broker 使用的大小
    =====================================================================================
    log.segment.bytes
    日志(消息)存储时多少内容拆分一个 segment 文件,单位是字节。topic 创建时可以单独指定这个值
    =====================================================================================
    log.retention.check.interval.ms
    多长时间去检查一次存储是否符合 log.retention.hours 和 log.retention.bytes,单位是毫秒
    =====================================================================================
    zookeeper.connect 和 zookeeper.connection.timeout.ms
    连接 zookeeper 的地址和超时时间
    =====================================================================================
    group.initial.rebalance.delay.ms
    有消费者加入到这个组后多久进行重新平衡,生产建议 3 秒,调试时可以配置为 0
    =====================================================================================
    delete.topic.enable
    是否允许删除 topic
    =====================================================================================
    queued.max.requests
    I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。应该是一种自我保护机制
    =====================================================================================
    auto.create.topics.enable
    是否允许自动创建 topics
    =====================================================================================
    offsets.retention.minutes
    内部 topic 中 offset 的过期时间,一般建议比 log.retention.hours 长,否则 offset 先过期的话数据可能被重新发送
    =====================================================================================
    log.cleaner.enable
    是否启用日志压缩
    =====================================================================================
    log.roll.hours=2
    和 log.segment.bytes 共同决定多久(多大)更换一个 segment 存储文件
    =====================================================================================
    

    1.7) 启动zk和kafka

    systemctl start zk
    systemctl start kafka
    

    启动后可以看看是否正常启动,jps可查看到

    24017 Logstash
    7159 QuorumPeerMain
    16698 Kafka
    31771 Jps
    

    二. Kafka创建Topic

    2.1) 创建一个topic

    (replication-factor-副本数,partitions-分区数,组成集群后server指定任意一个节点创建即可,后面节点会自动同步到其他节点)

    bin/kafka-topics.sh --create --bootstrap-server 192.168.2.112:9092 --replication-factor 1 --partitions 1 --topic new_filebeat
    

    2.2) 简单测试生产/消费(命令行)

    --bootstrap-server 可随意指定集群任意节点,如果不同节点查不到同步的topic或者拿不到消息也说明集群没有搭好。
    查看topic

    ./bin/kafka-topics.sh --bootstrap-server 192.168.2.112:9092 --topic new_filebeat --describe
    

    kafka生产者

    bin/kafka-console-producer.sh --broker-list  192.168.2.114:9092 --topic new_filebeat
    

    kafka消费者

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.112:9092  --topic new_filebeat
    

    正常结果如下:


    三.调整Filebeat配置

    将filbeat的输出由之前的output到logstash改成到kafka

    output.kafka:
      hosts: ['192.168.2.112:9092','192.168.2.113:9092','192.168.2.114:9092']
      topic: 'new_filebeat'
      enabled: true
      partition.round_robin:
        reachable_only: true
      required_acks: 1
      compression: gzip
      max_message_bytes: 104857600
    

    参数解读:

    reachable_only: true: 设置是否仅将消息发送到可到达的broker服务端,避免消息发送失败。
    required_acks: 1: 设置等待确认的应答数,1表示只需等待写入到leader分区后,就可以发送ACK确认消息。
    compression: gzip: 设置消息传输时的压缩算法,gzip表示使用gzip算法进行压缩。
    max_message_bytes: 104857600: 设置每个消息的最大字节数限制,104857600表示最大是100MB。
    这些参数可以根据实际情况进行调整,以避免消息传输失败,提高效率。
    

    重启服务重新加载配置systemctl restart filebeat

    四.调整Logstash配置

    将logstash的输入由之前的iutput从filebeat拿数据改成从kafka拿

    input {
       kafka {
          bootstrap_servers => ["192.168.2.112:9092,192.168.2.113:9092,192.168.2.114:9092"]
          topics_pattern => "new_filebeat"
          group_id => "logs"
          codec => "json"
          consumer_threads => 70
       }
    }
    

    参数解读:

    topics_pattern: 之前kafka中创建的topic名
    codec: 编码格式
    consumer_threads: 指定用于处理Kafka消息的线程数。默认为1,当应用程序需要同时处理大量Kafka消息时,配置大于1的数,可提高处理能力。
    group_id : 用于定义消费者的组 ID。消费者组是一组 Kafka 消费者,它们共享同一个组 ID,并从一个或多个 Kafka 主题消费消息。当多个消费者共享相同的组 ID 时,
    Kafka 会确保它们不会同时消费同一个分区中的消息,以保证分区内数据的负载均衡和整体消费速度的增加。在配置 Kafka 输入插件时,group_id 参数是必须配置的。
    如果没有配置 group_id,则会使用默认值 logstash,这可能会导致多个 Logstash 实例都使用相同的 group_id,从而造成消费冲突和数据重复。
    

    重启服务重新加载配置systemctl restart logstash./bin/logstash -f logstash.conf

    五.测试日志消息

    5.1)运行kafka消费者命令行指定我们上面创建的用于日志收集的topicnew_filebeat来监测消息

    ./kafka-console-consumer.sh --bootstrap-server 192.168.2.112:9092 --topic new_filebeat
    

    5.2)输入测试数据到filebeat收集的日志路径

    这里是指定收集/opt/nginx.log的日志

    echo '8.8.8.11 - - [22/Mar/2023:14:15:12 +0800] "GET / HTTP/1.1" 404 0 "-" Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 - 0.001  192.168.1.132:8005 900 0.000'>>/opt/nginx.log
    

    5.3)观察消费端发现有消息写到kafka的topic中了:

    5.4)观察kibana也能看到日志了:

    遇到的问题

    Q1.zookeeper集群启动失败

    ERROR Unable to load database on disk (org.apache.zookeeper.server.quorum.QuorumPeer)
     java.io.IOException: No snapshot found, but there are log entries. Something is broken!
      at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:281)
      at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:285)
      at org.apache.zookeeper.server.quorum.QuorumPeer.loadDataBase(QuorumPeer.java:1094)
      at org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:1079)
      at org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:227)
      at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:136)
      at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
    ERROR Unexpected exception, exiting abnormally (org.apache.zookeeper.server.quorum.QuorumPeerMain)
      java.lang.RuntimeException: Unable to run quorum server
      at org.apache.zookeeper.server.quorum.QuorumPeer.loadDataBase(QuorumPeer.java:1149)
      at org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:1079)
      at org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:227)
      at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:136)
      at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
    

    原因:

    之前搭建zk的时候是单点去初始化启动的,产生了数据文件,后加入了集群配置再次启动,zk会根据zk的log目录下的version-2下的数据和日志信息去启动,已经不匹配了,所以报没有可用的快照文件从磁盘加载。

    解决:

    删除zk数据目录中verrsion-2下的内容,删除zk的日志目录下的version-2下的内容,再次重新启动。
    或者 重新修改数据目录和日志目录重新初始化。

    Q2. kafka集群启动失败

    ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.InconsistentClusterIdException: The Cluster ID kVSgfurUQFGGpHMTBqBPiw doesn't match stored clusterId Some(0Qftv9yBTAmf2iDPSlIk7g) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:220)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:84)
        at kafka.Kafka.main(Kafka.scala)
    

    原因:

    集群的id是第一次启动的时候就会自动产生,如果改过配置或者修改数据目录再次启动可能重新触发新的id产生,这会造成冲突

    解决:

    删除meta.proeteis (在kafka2.7.0版本中这个文件在kafka的log目录下)或者 修改meta.proeteis的id为报错中提示的id即可。 重启服务,jps查看进程是否有kafka

    相关文章

      网友评论

          本文标题:ELFK+Kafka集群搭建

          本文链接:https://www.haomeiwen.com/subject/jjcmrdtx.html