美文网首页
Kafka集群

Kafka集群

作者: 想成为大师的学徒小纪 | 来源:发表于2022-02-28 14:33 被阅读0次

    一、简介

    • Kafka简介

      Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(ElasticsearchHadoop等)

      Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

    • Zookeeper简介

      Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    二、kafka基本概念

    在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

    概念/对象 简单说明
    Broker Kafka节点
    Leader 用于处理消息的接收和消费等请求,也就是说producer是将消息push到leader,而consumer也是从leader上去poll消息
    Follower 主要用于备份消息数据,一个leader会有多个follower
    Topic 主题,用来承载消息
    Partition 分区,用于主题分片存储
    Producer 生产者,向主题发布消息的应用
    Consumer 消费者,从主题订阅消息的应用
    Consumer Group 消费者组,由多个消费者组成

    kafka集群拓扑图:

    img

    三、部署步骤

    由于 kafka 依赖 zookeeper 因此需要安装 zookeeper,而kafka是基于scala语言编写,scala又是基于 jdk的,因此需要安装 jdk

    本次搭建所用主机

    kafka01:10.81.0.100 kafka02:10.81.0.101 kafka03:10.81.0.102

    1、安装jdk8及以上版本

    <!== 所有主机都执行 ==>

    下载链接:https://www.oracle.com/java/technologies/javase/javase8u211-later-archive-downloads.html#license-lightbox

    $ tar zxf jdk8.tar.gz -C /usr/local
    $ echo 'export JAVA_HOME="/usr/local/jdk8"' >>/etc/profile
    $ echo 'export PATH="$JAVA_HOME/bin:$PATH"' >> /etc/profile
    $ source /etc/profile
    $ java -version
    

    2、Zookeeper集群部署

    <!== 所有主机都执行 ==>

    • 安装软件
    $ cd /usr/local/src && wget https://downloads.apache.org/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz
    $ tar zxf apache-zookeeper-3.5.9-bin.tar.gz -C /usr/local/
    $ cd .. && mv apache-zookeeper-3.5.9-bin/ zookeeper
    $ echo 'export ZOOKEEPER_HOME="/usr/local/zookeeper"' >>/etc/profile
    $ echo 'export PATN="$ZOOKEEPER_HOME/bin:$PATH"' >>/etc/profile
    $ source /etc/profile
    
    • 修改配置文件
    $ cd zookeeper/conf && cp -p zoo_sample.cfg zoo.cfg
    $ mkdir -p /data/zookeeper
    $ mkdir /var/log/zookeeper
    $ cat > zoo.cfg<<'EOF'
    dataDir=/data/zookeeper
    dataLogDir=/var/log/zookeeper/
    tickTime=2000
    initLimit=5
    syncLimit=2
    autopurge.snapRetainCount=3
    autopurge.purgeInterval=0
    maxClientCnxns=1024
    #standaloneEnabled=true
    #admin.enableServer=true
    server.1=10.81.0.100:2888:3888
    server.2=10.81.0.101:2888:3888
    server.3=10.81.0.200:2888:3888
    clientPort=2181
    EOF
    

    <!== kafka01主机执行 ==>

    $ echo 1 > /data/zookeeper/myid
    

    <!== kafka02主机执行 ==>

    $ echo 2 > /data/zookeeper/myid
    

    <!== kafka03主机执行 ==>

    $ echo 3 > /data/zookeeper/myid
    
    • 设置systemd管理
    $ cat > /etc/systemd/system/zookeeper.service <<'EOF'
    [Unit]
    Description=zookeeper.service
    After=network.target
    
    [Service]
    User=zookeeper
    Type=forking
    Environment=ZOO_LOG_DIR=/var/log/zookeeper
    Environment=JAVA_HOME=/usr/local/jdk8
    ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
    ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
    ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
    Restart=on-failure
    StartLimitInterval=60
    StartLimitBurst=3000
    
    [Install]
    WantedBy=multi-user.target
    EOF
    $ groupadd zookeeper
    $ useradd -g zookeeper -M -s /sbin/nologin zookeeper
    $ chown -R zookeeper. /data/zookeeper/
    $ chown -R zookeeper. /var/log/zookeeper/
    $ systemctl daemon-reload
    $ systemctl start zookeeper
    $ systemctl enable zookeeper
    
    • 查看主节点
    $ /usr/local/zookeeper/bin/zkServer.sh status
    

    3、kafka集群部署

    <!== 所有主机都执行 ==>

    • 安装软件
    $ cd /usr/local/src && wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate
    $ tar xf kafka_2.12-3.0.0.tgz -C /usr/local
    $ cd .. && mv kafka_2.12-3.0.0 kafka
    $ echo 'export KAFKA_HOME=/usr/local/kafka' >>/etc/profile
    $ echo 'export PATH="$PATH:$KAFKA_HOME/bin"' >>/etc/profile
    $ source /etc/profile
    
    • 修改配置文件

    <!== kafka01主机执行 ==>

    $ cat >/usr/local/kafka/config/server.properties <<'EOF'
    # broker的全局唯一编号,不能重复
    broker.id=1
    
    # kafka对外提供服务的监听地址及端口
    listeners=PLAINTEXT://0.0.0.0:12315
    
    
    
    # 代理将向生产者和消费者公布主机名和端口。如果没有设置,
    # 如果已配置,它将使用“侦听器”的值。否则,它将使用该值 
    advertised.listeners=PLAINTEXT://10.81.0.100:12315
    
    # 处理网络请求的线程数量,也就是接收消息的线程数。
    # 接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
    num.network.threads=3
    
    # broker处理磁盘IO的线程数,数值应该大于你的硬盘数
    num.io.threads=8
    
    # socket的发送缓冲区,socket的调优参数SO_SNDBUFF
    socket.send.buffer.bytes=102400
    
    # socket的接受缓冲区,socket的调优参数SO_RCVBUFF
    socket.receive.buffer.bytes=102400
    
    # socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
    socket.request.max.bytes=104857600
    
    # kafka数据的存放地址,多个地址的话用逗号分割 
    # 如果指定了多个地址,那么broker就会根据"最少使用原则"
    # 把同一个分区数据片段放在同一个路径下
    log.dirs=/data/kafka
    
    # 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
    num.partitions=3
    
    # Kafka会使用可配置的线程池来处理日志片段:
    #    服务器正常启动,用于打开每个分区的日志片段
    #    服务器崩愤后重启,用于检查和截短每个分区的日志片段
    #    服务器正常关闭,用于关闭日志片段。
    # 默认情况下,每个日志目录只使用一个线程。
    # 因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作
    # 特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间
    # 设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。
    # 也就是说,如果num.recovery.threads.per.data.dir被设为8,井且log.dirs指定了3个路径,那么总共需要24个线程。
    num.recovery.threads.per.data.dir=1
    
    # 组元数据内部topic"__consumer_offsets" and "__transaction_state"的复制系数
    # 对于开发测试以外的任何测试,建议使用大于1的值来确保可用性,例如3。
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=3
    
    # 数据默认保存时间
    log.retention.hours=168
    
    # 基于大小的日志保留策略。将从日志中删除段,除非剩余的
    # 段位于log.retention.bytes下面。独立于log.retention.hours的功能。
    log.retention.bytes=10737418240
    
    # topic的分区是以一堆segment文件存储的,这个控制每个segment的大小
    # 大小达到指定的上限(默认是lGB)时,当前`segment`就会被关闭
    # 一个新的`segment`被打开
    log.segment.bytes=1073741824
    
    # 文件大小检查的周期时间
    log.retention.check.interval.ms=300000
    
    # zookeeper连接地址
    zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
    
    # zookeeper连接超时时间
    zookeeper.connection.timeout.ms=18000
    
    # 指定GroupCoordinator延迟初始使用者重新平衡的时间(以毫秒为单位)。
    # 官方建议生产环境设置为3秒
    group.initial.rebalance.delay.ms=0
    EOF
    

    <!== kafka02主机执行 ==>

    $ cat >/usr/local/kafka/config/server.properties <<'EOF'
    broker.id=2
    listeners=PLAINTEXT://0.0.0.0:12315
    advertised.listeners=PLAINTEXT://10.81.0.101:12315
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka
    num.partitions=3
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=3
    log.retention.hours=168
    log.retention.bytes=10737418240
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
    zookeeper.connection.timeout.ms=18000
    group.initial.rebalance.delay.ms=0
    EOF
    

    <!== kafka03主机执行 ==>

    $ cat >/usr/local/kafka/config/server.properties <<'EOF'
    broker.id=3
    listeners=PLAINTEXT://0.0.0.0:12315
    advertised.listeners=PLAINTEXT://10.81.0.200:12315
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka
    num.partitions=3
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=3
    log.retention.hours=168
    log.retention.bytes=10737418240
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
    zookeeper.connection.timeout.ms=18000
    group.initial.rebalance.delay.ms=0
    EOF
    
    • 创建数据目录并授权
    $ groupadd kafka
    $ useradd -g kafka -M -s /sbin/nologin kafka
    $ mkdir /data/kafka
    $ chown -R kafka. /data/kafka
    $ chown -R kafka. /usr/local/kafka
    
    • 设置systemd管理
    $ cat >/etc/systemd/system/kafka.service <<'EOF'
    [Unit]
    Description=Apache Kafka server (broker)
    Documentation=http://kafka.apache.org/documentation.html
    Requires=network.target
    After=zookeeper.service
     
    [Service]
    Type=forking
    User=kafka
    Group=kafka
    Environment=JAVA_HOME=/usr/local/jdk8
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
    # 启动失败后重启
    Restart=on-failure
    # 每次尝试重启间隔60秒
    StartLimitInterval=60
    # 最终尝试重启50次
    StartLimitBurst=50
    [Install]
    WantedBy=multi-user.target
    EOF
    $ systemctl daemon-reload
    $ systemctl start kafka
    $ systemctl status kafka
    $ systemctl enable kafka
    

    4、kafka开启jmx监控

    <!== 所有主机都执行 ==>

    • 修改kafka启动脚本
    $ vi /usr/local/kafka/bin/kafka-server-start.sh
    ...
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
        export JMX_PORT="9999"
    fi
    ...
    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.rmi.server.hostname=xxx.xxx.xxx.xxx kafka.Kafka "$@"
    
    • 重启kafka
    $ systemctl restart kafka
    

    5、kafka-eagle监控部署

    • 安装软件
    $ cd /usr/local/src && wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.0.8.tar.gz
    $ tar zxf v2.0.8.tar.gz
    $ tar zxf efak-web-2.0.8-bin.tar.gz -C /usr/local
    $ cd .. && mv efak-web-2.0.8 kafka-eagle
    
    • 修改配置文件
    $ cat >/usr/local/kafka-eagle/conf/system-config.properties <<'EOF'
    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=10.81.0.100:2181,10.81.0.101:2181,10.81.0.200:2181
    cluster1.zk.acl.enable=false
    cluster1.zk.acl.schema=digest
    cluster1.zk.acl.username=test
    cluster1.zk.acl.password=test123
    cluster1.efak.broker.size=20
    kafka.zk.limit.size=32
    efak.webui.port=8048
    cluster1.efak.offset.storage=kafka
    cluster2.efak.offset.storage=zk
    cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
    efak.metrics.charts=true
    efak.metrics.retain=15
    efak.sql.topic.records.max=5000
    efak.sql.topic.preview.records.max=10
    efak.topic.token=keadmin
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://127.0.0.1:3306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=kafka_eagle_user
    efak.password=123456
    EOF
    $ chown -R kafka. /usr/local/kafka-eagle
    
    • 当你的zookeeper版本是3.5以后,需要开启zkcli权限修复脚本内容
    $ vim /usr/local/zookeeper/bin/zkServer.sh
    ...
    // after 77 lines
    ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
    $ systemctl restart zookeeper
    
    • 设置systemd管理
    $ cat >/etc/systemd/system/kafka-eagle.service <<'EOF'
    [Unit]
    Description=Kafka Eagle
    After=kafka.service
    
    [Service]
    Environment=KE_HOME=/usr/local/kafka-eagle
    Environment=JAVA_HOME=/usr/local/jdk8
    User=kafka
    Group=kafka
    Type=forking
    ExecStart=/usr/local/kafka-eagle/bin/ke.sh start
    ExecReload=/usr/local/kafka-eagle/bin/ke.sh restart
    ExecStop=/usr/local/kafka-eagle/bin/ke.sh stop
    #启动失败后重启
    Restart=on-failure
    #每次尝试重启间隔60秒
    StartLimitInterval=60
    #最终尝试重启50次
    StartLimitBurst=50
    
    [Install]
    WantedBy=multi-user.target
    EOF
    $ systemctl daemon-reload
    $ systemctl start kafka-eagle
    $ systemctl status kafka-eagle
    $ systemctl enable kafka-eagle
    
    • 访问网页

    http://xxx.xxx.xxx.xxx:8048

    管理员用户admin

    管理员密码123456

    A330FC34-3C35-4fc5-A2BA-B11045B92876.png

    相关文章

      网友评论

          本文标题:Kafka集群

          本文链接:https://www.haomeiwen.com/subject/brylrrtx.html