Centos7 搭建Kafka集群

作者: tianwen01 | 来源:发表于2018-10-13 13:34 被阅读26次

    1. 准备

    #安装Java运行环境JRE,下载Server Jre上传到tmp目录
    mkdir /usr/java;\
    tar xf /tmp/server-jre-8u181-linux-x64.tar.gz -C /usr/java;\
    ln -s /usr/java/jdk1.8.0_181 /usr/java/default;\
    tee -a /etc/profile << 'EOF'
    export JAVA_HOME=/usr/java/default
    export PATH=$JAVA_HOME/bin:$PATH
    EOF
    
    #生效环境变量
    source /etc/profile
    
    #下载部署包
    wget -c -P /tmp/ https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.1.1/kafka_2.11-1.1.1.tgz;\
    mkdir /opt/kafka;\
    tar xf /tmp/kafka_2.11-1.1.1.tgz -C /opt/kafka
    
    #添加用户及目录
    useradd -s /sbin/nologin kafka;\
    mkdir -p /home/kafka/data/kafka /home/kafka/data/zookeeper
    

    2. Zookeeper

    2.1 配置zookeeper.properties

    mv /opt/kafka/kafka_2.11-1.1.1/config/zookeeper.properties /opt/kafka/kafka_2.11-1.1.1/config/zookeeper.properties_bak;\
    tee /opt/kafka/kafka_2.11-1.1.1/config/zookeeper.properties << 'EOF'
    #数据目录,不建议放到程序内,因为更新到时候容易丢,应该放在统一到数据目录
    dataDir=/home/kafka/data/zookeeper
    
    #zookeeper中使用的基本时间单位, 毫秒值
    tickTime=2000
    
    #这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个 tickTime 时间间隔数。这里设置为5表名最长容忍时间为 5 * 2000 = 10 秒
    initLimit=10
    
    #这个配置标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5 * 2000 = 10秒
    syncLimit=5
    
    #客户端连接的端口
    clientPort=2181
    
    #每个IP最大客户端连接数,0表示关闭限制
    maxClientCnxns=1000
    
    #节点信息
    server.41=192.168.1.41:2888:3888
    server.42=192.168.1.42:2888:3888
    server.43=192.168.1.43:2888:3888
    EOF
    

    2.2 配置myid

    #zookeeper node-1
    echo "41"  > /home/kafka/data/zookeeper/myid
    
    #zookeper node-2
    echo "42"  > /home/kafka/data/zookeeper/myid
    
    #zookeeper node-3
    echo "43"  > /home/kafka/data/zookeeper/myid
    

    2.3 添加Systemd服务

    tee /etc/systemd/system/zookeeper.service << 'EOF'
    [Unit] 
    Description=Apache Zookeeper server (Kafka) 
    Documentation=http://zookeeper.apache.org 
    Requires=network.target remote-fs.target  
    After=network.target remote-fs.target 
    [Service] 
    Type=simple 
    User=kafka 
    Group=kafka 
    Environment=JAVA_HOME=/usr/java/default 
    ExecStart=/opt/kafka/kafka_2.11-1.1.1/bin/zookeeper-server-start.sh /opt/kafka/kafka_2.11-1.1.1/config/zookeeper.properties 
    ExecStop=/opt/kafka/kafka_2.11-1.1.1/bin/zookeeper-server-stop.sh 
    [Install] 
    WantedBy=multi-user.target
    EOF
    

    2.4 启动

    chown -R kafka. /home/kafka/data /opt/kafka;\
    systemctl daemon-reload;\
    systemctl enable zookeeper;\
    systemctl start zookeeper;\
    systemctl status zookeeper
    

    2.5 打开防火墙

    firewall-cmd --add-port=2181/tcp --permanent;\
    firewall-cmd --add-port=2888/tcp --permanent;\
    firewall-cmd --add-port=3888/tcp --permanent;\
    firewall-cmd --reload
    

    2.6 查看

    /opt/kafka/kafka_2.11-1.1.1/bin/zookeeper-shell.sh 192.168.1.41:2181

    使用参考:https://my.oschina.net/tongyufu/blog/1806196

    3. Kafka

    3.1 配置server.properties

    mv /opt/kafka/kafka_2.11-1.1.1/config/server.properties /opt/kafka/kafka_2.11-1.1.1/config/server.properties_bak;\
    tee /opt/kafka/kafka_2.11-1.1.1/config/server.properties << 'EOF'
    #非负整数,用于唯一标识broker,不同节点该值必须不一样,比如 1,2,3
    broker.id=41
    
    # broker 发布给生产者消费者的hostname,会存储在zookeeper。配置好这个host可以实现内网外网同时访问。
    listeners=PLAINTEXT://192.168.1.41:9092
    
    #borker进行网络处理的线程数
    num.network.threads=3
    
    #borker进行I/O处理的线程数
    num.io.threads=8
    
    #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.send.buffer.bytes=102400
    
    #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.receive.buffer.bytes=102400
    
    #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    socket.request.max.bytes=104857600
    
    #kafka存放消息的目录
    log.dirs=/home/kafka/data/kafka/kafka-logs
    
    #每个topic默认partition数量,根据消费者实际情况配置,配置过小会影响消费性能
    num.partitions=30
    
    # kafka启动恢复日志,关闭前日志刷盘的线程数,如果是raid的盘的话适当增加,比如3
    num.recovery.threads.per.data.dir=2
    
    #是否允许删除topic,开发环境应该是true,测试及生产环境应该是false
    delete.topic.enable=true
    
    #每个topic的副本数量,默认为1,生产最好3个以上
    offsets.topic.replication.factor=3
    
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    
    #日志刷盘机制默认,10000条刷一次,1秒刷一次
    #log.flush.interval.messages=10000
    #log.flush.interval.ms=1000
    
    #日志保留时间,默认为7天,如果量大,可以酌情改小,比如三天
    log.retention.hours=168
    
    #日志保留大小,默认是保留1G,生产上可以配置为下面50G
    log.retention.bytes=53687091200
    
    #The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    #Zookeeper host和port ,这个看zookeeper环境而设置
    zookeeper.connect=192.168.1.41:2181,192.168.1.42:2181,192.168.1.43:2181
    
    #Zokeeper 连接超时时间
    zookeeper.connection.timeout.ms=6000
    
    #是让coordinator推迟空消费组接收到成员加入请求后本应立即开启的rebalance。在实际使用时,假设你预估你的所有consumer组成员加入需要在10s内完成,那么你就可以设置该参数10000
    group.initial.rebalance.delay.ms=10000
    
    EOF
    
    

    3.2 server.properties分节点特殊配置

    #kafka node-1
    sed -i '/broker.id=/c\broker.id=41' /opt/kafka/kafka_2.11-1.1.1/config/server.properties;\
    sed -i '/listeners=/c\listeners=PLAINTEXT://192.168.1.41:9092' /opt/kafka/kafka_2.11-1.1.1/config/server.properties
    
    #kafka node-2
    sed -i '/broker.id=/c\broker.id=42' /opt/kafka/kafka_2.11-1.1.1/config/server.properties;\
    sed -i '/listeners=/c\listeners=PLAINTEXT://192.168.1.42:9092' /opt/kafka/kafka_2.11-1.1.1/config/server.properties
    
    #kafka node-3
    sed -i '/broker.id=/c\broker.id=43' /opt/kafka/kafka_2.11-1.1.1/config/server.properties;\
    sed -i '/listeners=/c\listeners=PLAINTEXT://192.168.1.43:9092' /opt/kafka/kafka_2.11-1.1.1/config/server.properties
    

    3.3 添加Systemd服务

    tee /etc/systemd/system/kafka.service << 'EOF'
    [Unit] 
    Description=Apache Kafka server (broker) 
    Documentation=http://kafka.apache.org/documentation.html 
    Requires=network.target remote-fs.target  
    After=network.target remote-fs.target zookeeper.service 
    [Service] 
    Type=simple 
    User=kafka 
    Group=kafka 
    Environment=JAVA_HOME=/usr/java/default 
    ExecStart=/opt/kafka/kafka_2.11-1.1.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.11-1.1.1/config/server.properties 
    ExecStop=/opt/kafka/kafka_2.11-1.1.1/bin/kafka-server-stop.sh 
    [Install] 
    WantedBy=multi-user.target
    
    EOF
    

    3.4 启动

    chown -R kafka. /home/kafka/data /opt/kafka;\
    systemctl daemon-reload;\
    systemctl enable kafka;\
    systemctl start kafka;\
    systemctl status kafka
    

    3.5 防火墙

    firewall-cmd --add-port=9092/tcp --permanent;\
    firewall-cmd --reload
    

    3.6 查看

    
    #查看topic
    /opt/kafka/kafka_2.11-1.1.1/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
    
    #创建topic
    /opt/kafka/kafka_2.11-1.1.1/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --replication-factor 1 --partitions 1 --topic test
    
    #删除topic
    /opt/kafka/kafka_2.11-1.1.1/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic test
    

    相关文章

      网友评论

        本文标题:Centos7 搭建Kafka集群

        本文链接:https://www.haomeiwen.com/subject/qqsoaftx.html