美文网首页
Kafka应用总结

Kafka应用总结

作者: 筑梦之队 | 来源:发表于2020-09-14 11:36 被阅读0次

    在今年的两个项目中,为了处理大量的游戏日志,于是引入了kafka。经过一段时间的运行,现将整个应用的过程做一个记录和归档。

    关于kafka

    Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
    以上内容摘取自kafka官网,https://kafka.apache.org/
    学习kafka最核心的概念,最好的方式就是去官网。当然,要想看懂,需要英文有一定的基础。
    除了官网以外,我还推荐一门在线课程,极客时间上的:《Kafka核心技术与实战》。这是一门系统讲述如何在实际工作中使用kafka的课程。

    系统部署

    kafka支持两种部署方式,单机部署和集群部署。在生产环境中,我们自然会选择集群部署。集群的部署包括以下两个部分:部署zookeeper和kafka。由于kafka依赖zookeeper来支持分布式的共识机制,所以需要部署zookeeper;虽然kafka中内置了zookeeper,但是对于集群而言,最好是部署单独的zookeeper集群。

    部署zookeeper

    1. 准备好zookeeper集群的文件
    • zk_servers.txt
    172.27.0.4
    172.27.0.10
    172.27.0.13
    
    1. 在目标服务器集群中创建zookeeper账号
    • zk_create_user_mgr.sh
    #!/bin/bash
    
    server_list_file=$1
    if [ -z $server_list_file ]; then
        echo "Usage: ./zk_create_user_mgr.sh server_list_file"
        exit 1
    fi
    
    id=1
    for server in `cat $server_list_file`; do
        echo $id":"$server
        scp zk_create_user.sh root@$server:/root
        ssh root@$server '/root/zk_create_user.sh'
        id=$(($id+1))
    done
    
    • zk_create_user.sh
    #!/bin/bash
    
    adduser -d /home/zookeeper -m -s /bin/bash zookeeper
    mkdir /home/zookeeper/.ssh
    cp /root/.ssh/authorized_keys /home/zookeeper/.ssh/authorized_keys
    chown zookeeper:zookeeper /home/zookeeper/.ssh
    chown zookeeper:zookeeper /home/zookeeper/.ssh/authorized_keys
    
    1. 在目标服务器集群中部署zookeeper服务
    • zk_init_mgr.sh
    #!/bin/bash
    
    zk_server_list_file=$1
    if [ -z $zk_server_list_file ]; then
        echo "Usage:./zk_init_mgr.sh zk_server_list_file"
        exit 1
    fi
    
    count=`cat $zk_server_list_file | wc -l`
    if [ $count -ne 3 ]; then
        echo "ZooKeeper should have 3 servers"
        exit 1
    fi
    
    zk_server1=`head -1 $zk_server_list_file | tail -1`
    zk_server2=`head -2 $zk_server_list_file | tail -1`
    zk_server3=`head -3 $zk_server_list_file | tail -1`
    
    id=1
    for server in `cat $zk_server_list_file`; do
        echo $id":"$server
        scp jdk-8u231-linux-x64.tar.gz zookeeper@$server:/home/zookeeper
        scp apache-zookeeper-3.6.1-bin.tar.gz zookeeper@$server:/home/zookeeper
        scp zk_init.sh zookeeper@$server:/home/zookeeper
        ssh zookeeper@$server "/home/zookeeper/zk_init.sh $id $zk_server1 $zk_server2 $zk_server3"
        id=$(($id+1))
    done
    
    • zk_init.sh
    #!/bin/bash
    echo "init zookeeper..."
    
    # Decompress
    echo "Decomparess file"
    if [ ! -d jdk1.8.0_231 ]; then
        tar -xf jdk-8u231-linux-x64.tar.gz
        echo "PATH=$PATH:/home/zookeeper/jdk1.8.0_231/bin" >> /home/zookeeper/.bash_profile
        echo "export PATH" >> /home/zookeeper/.bash_profile
    fi
    
    if [ ! -d apache-zookeeper-3.6.1-bin ]; then
        tar -xf apache-zookeeper-3.6.1-bin.tar.gz
    fi
    
    # Set environment
    source /home/zookeeper/.bash_profile
    
    # Handle
    
    echo "Set the config file"
    cat << EOF > /home/zookeeper/apache-zookeeper-3.6.1-bin/conf/zoo.cfg
    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=/home/zookeeper/data
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    
    server.1=$2:2888:3888
    server.2=$3:2888:3888
    server.3=$4:2888:3888
    
    EOF
    
    echo "Set the myid file"
    mkdir -p /home/zookeeper/data
    echo $1 > /home/zookeeper/data/myid
    
    # Start the zookeeper
    /home/zookeeper/apache-zookeeper-3.6.1-bin/bin/zkServer.sh start
    

    部署kafka

    1. 准备好kafka集群的文件
    • kafka_servers.txt
    172.27.0.4
    172.27.0.10
    172.27.0.13
    
    1. 在目标服务器集群中创建kafka账号
      . kafka_create_user_mgr.sh
    #!/bin/bash
    
    kafka_server_list_file=$1
    if [ -z $kafka_server_list_file ]; then
        echo "Usage: ./kafka_create_user_mgr.sh kafka_server_list_file"
        exit 1
    fi
    
    id=1
    for server in `cat $kafka_server_list_file`; do
        echo $id":"$server
        scp kafka_create_user.sh root@$server:/root
        ssh root@$server '/root/kafka_create_user.sh'
        id=$(($id+1))
    done
    

    . kafka_create_user.sh

    #!/bin/bash
    
    adduser -d /home/kafka -m -s /bin/bash kafka
    mkdir /home/kafka/.ssh
    cp /root/.ssh/authorized_keys /home/kafka/.ssh/authorized_keys
    chown kafka:kafka /home/kafka/.ssh
    chown kafka:kafka /home/kafka/.ssh/authorized_keys
    
    1. 在目标服务器集群中部署kafka服务
    • kafka_init_mgr.sh
    #!/bin/bash
    
    zk_server_list_file=$1
    if [ -z $zk_server_list_file ]; then
        echo "Usage:./kafka_init_mgr.sh zk_server_list_file kafka_server_list_file"
        exit 1
    fi
    
    count=`cat $zk_server_list_file | wc -l`
    if [ $count -ne 3 ]; then
        echo "ZooKeeper should have 3 servers"
        exit 1
    fi
    
    zk_server1=`head -1 $zk_server_list_file | tail -1`
    zk_server2=`head -2 $zk_server_list_file | tail -1`
    zk_server3=`head -3 $zk_server_list_file | tail -1`
    
    kafka_server_list_file=$2
    if [ -z $kafka_server_list_file ]; then
        echo "Usage:./kafka_init_mgr.sh zk_server_list_file kafka_server_list_file"
        exit 1
    fi
    
    id=1
    for server in `cat $kafka_server_list_file`; do
        echo $id":"$server
        scp jdk-8u231-linux-x64.tar.gz kafka@$server:/home/kafka
        scp kafka_2.12-2.5.0.tgz kafka@$server:/home/kafka
        scp kafka_init.sh kafka@$server:/home/kafka
        ssh kafka@$server "/home/kafka/kafka_init.sh $id $server $zk_server1 $zk_server2 $zk_server3"
        id=$(($id+1))
    done
    
    • kafka_init.sh
    #!/bin/bash
    echo "init kafka..."
    
    # Decompress
    echo "Decomparess file"
    if [ ! -d jdk1.8.0_231 ]; then
        tar -xf jdk-8u231-linux-x64.tar.gz
        echo "PATH=$PATH:/home/kafka/jdk1.8.0_231/bin" >> /home/kafka/.bash_profile
        echo "export PATH" >> /home/kafka/.bash_profile
    fi
    
    if [ ! -d kafka_2.12-2.5.0 ]; then
        tar -xf kafka_2.12-2.5.0.tgz
    fi
    
    # Set environment
    source /home/kafka/.bash_profile
    
    # Handle
    
    echo "Set the config file"
    cat << EOF > /home/kafka/kafka_2.12-2.5.0/config/server.properties
    broker.id=$1
    
    #=======Storage configuration=========
    log.dirs=/home/kafka/kafka-logs
    
    #=======ZooKeeper configuration=======
    zookeeper.connect=$3:2181,$4:2181,$5:2181
    zookeeper.connection.timeout.ms=18000
    
    #=======Connection configuration======
    listeners=PLAINTEXT://$2:9092
    
    #=======Topic configuration===========
    auto.create.topics.enable=false
    unclean.leader.election.enable=false
    auto.leader.rebalance.enable=false
    
    #=======Data save configuration=======
    log.retention.hours=12
    log.retention.bytes=-1
    message.max.bytes=10000120
    log.segment.bytes=104857600
    log.retention.check.interval.ms=300000
    
    #=======Transaction configuration=====
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=1
    
    #=======Offset configuration=========
    offsets.topic.replication.factor=3
    
    #=======Other configuration==========
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    num.recovery.threads.per.data.dir=1
    group.initial.rebalance.delay.ms=0
    EOF
    
    # Start the kafka
    /home/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /home/kafka/kafka_2.12-2.5.0/config/server.properties
    

    动态调整kafka集群

    在kafka运行过程中,可能会需要动态调整kafka集群,包括升级服务器、增加服务器、删除服务器、调整broker配置参数、调整Topic配置参数。在项目的运行周期中,我做过五次调整。

    1. 调整所有broker的配置
      我们使用了两种类型的Topic:全局和Topic和每个区服一个的Topic。
      最初,每个区服一个的Topic设置的日志保存时间为72小时,全局的Topic设置的保存时间是24小时;但是后面,由于最早规划的硬盘不足,再加上在运行过程中没有出过任何问题,所以就计划将日志保存时间缩短。
      对于每个区服一个Topic的情况来说,由于区服较多,导致Topic有几千个;于是没有办法一个一个地去调整Topic的参数,于是就动态调整broker的参数。命令如下:
    bin/kafka-configs.sh --bootstrap-server $kafka_host:$port --entity-type brokers --entity-default --alter --add-config log.retention.ms=43200000
    

    调整完成后,检查一下看是否生效:

    bin/kafka-configs.sh --bootstrap-server $kafka_host:$port --entity-type brokers --entity-default --describe
    
    1. 调整单个topic的配置
      而对于全局的Topic,由于数量少(我们总共只用了两个),同时它的日志保存的时间比其它Topic更短,所以进行单独调整。命令如下:
    bin/kafka-configs.sh --bootstrap-server $kafka_host:$port --entity-type topics --entity-name $topic_name --alter --add-config retention.ms=3600000
    

    调整完成后,检查一下看是否生效:

    bin/kafka-configs.sh --bootstrap-server $kafka_host:$port --entity-type topics --entity-name $topic_name --describe
    
    1. 升级服务器
      在一次大规模的推广前一小时,忽然收到服务器性能的报警,CPU达到80%多了。于是乎,立即决定对现有服务器进行升级。
      由于kafka是一个分布式、高可用的集群,并且在做服务器配置时,设置了全局的1主2从的结构。于是,升级就变得很简单了,完全可以进行滚动升级,整个集群没有任何的down time。
      操作如下:
      从第一台机器开始,关机,升级,启动,检查状态;如果成功,则对下一台进行同样方式的升级。
    2. 增加服务器
      当第二波推广高峰到来前,经过评估,现有的服务器无法支撑新的推广,于是决定增加3台机器。当把3台新的机器添加到集群中后,我们就面临着kafka的一个不足,对于现有的Topic,kafka不会移动它们到新的broker上,即便它们非常空闲。
      这时,我们有两个方案:
    • 第一、将一些现有的Topic转移到新的broker上;
    • 第二、将尚未开服的服务器对应的Topic删除,然后重启,让它们在新的broker上生成Topic。
      最后,我们选择了第二种。
    1. 回收服务器
      当我们发现系统有非常大的空闲资源后,决定回收一些服务器。但是面临着一些问题。
      前3台回收时,非常顺利;但是当回收第4、5台时,由于涉及到kafka的rebalance,导致回收过程非常慢,程序中出现了大量的错误日志;于是,暂时放弃回收第4、5台。等待业务空闲的时候再进行。

    相关文章

      网友评论

          本文标题:Kafka应用总结

          本文链接:https://www.haomeiwen.com/subject/yhgaektx.html