美文网首页
Kafka集群搭建

Kafka集群搭建

作者: 鋆坤 | 来源:发表于2020-07-10 19:09 被阅读0次

    Kafka之分布式消息队列

    一、简介

    Kafka官网

    二、生产集群搭建

    准备

    • 安装JDK的linux服务器三台(JDK版需要与Kafka版本对应好)
    • ​Zookeeper集群

    下载

    准备好kafka安装包,官网下载地址

    创建目录

    cd /opt
    mkdir kafka       #创建安装目录
    cd kafka
    mkdir kafkalogs   #创建kafka消息目录,主要存放kafka消息
    

    解压

    tar -zxvf kafka_2.11-1.0.0.tgz -C /opt/kafka/
    

    修改配置

    cd /opt/kafka/kafka_2.11-1.0.0/config/
    vim server.properties
    
    broker.id=0  #集群内全局唯一,每台服务器设置不同的值
    listeners=PLAINTEXT://192.168.xx.xx:9092 #这个IP地址也是与本机相关的,每台服务器上设置为自己的IP地址
    log.dirs=/opt/kafka/kafkalogs #存放kafka消息
    zookeeper.connect=192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 #zookeeper集群
    

    拷贝安装目录

    分发kafka目录到集群所有服务器,同时修改每个文件的broker.id和listeners

    启动

    #先启动zookeeper集群
    bin/kafka-server-start.sh -daemon config/server.properties
    

    测试

    创建主题 
    bin/kafka-topics.sh --create --bootstrap-server 192.168.xx.xx:9092 --replication-factor 3 --partitions 3 --topic test01
    
    查看主题列表
    bin/kafka-topics.sh --list --bootstrap-server 192.168.xx.xx:9092
            
    启动控制台生产者
    bin/kafka-console-producer.sh --broker-list 192.168.xx.xx:9092 --topic test01
    
    启动控制台消费者        
    bin/kafka-console-consumer.sh --bootstrap-server 192.168.xx.xx:9092 --topic test01 --from-beginning
    
    在生产者控制台输入hello kafka
    

    配置详解

    broker.id=0
    #每个 broker 都可以用一个唯一的非负整数 id 进行标识; 这个 id 可以作为 broker 的“名字”, 并且它的存在使得 broker 无须混淆 consumers
    #就可以迁移到不同的 host/port 上。 你可以选择任意你喜欢的数字作为 id, 只要 id 是唯一的即可
    
    listeners=PLAINTEXT://192.168.xx.xx:9092
    #Kafka服务地址
    
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    #Kafka注册到Zookeeper的地址,内网不用设置默认使用listeners,
    #内外网环境
    #listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
    #listeners=INTERNAL://192.168.xx.xx:9092,EXTERNAL://192.168.xx.xx:9093
    #advertised.listeners=INTERNAL://192.168.xx.xx:9092,EXTERNAL://<公网ip>:<端口>
    #inter.broker.listener.name=INTERNAL
    
    num.network.threads=3
    # broker 处理消息的最大线程数,一般情况下不需要去修改
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    socket.send.buffer.bytes=102400
    #SO_SNDBUFF 缓存大小, server 进行 socket连接所用
    
    socket.receive.buffer.bytes=102400
    #SO_RCVBUFF 缓存大小, server 进行 socket连接时所用
    
    socket.request.max.bytes=104857600
    #server允许的最大请求尺寸; 这将避免server溢出, 它应该小于 Java heap size
    
    log.dirs=/opt/kafka/kafkalogs
    #kafka 存放数据的路径。这个路径并不是唯一的,可以是多个, 路径之间只需要使用逗号分隔即可; 
    #每当创建新 partition 时, 都会选择在包含最少 partitions 的路径下进行。
    
    num.partitions=3
    #如果创建 topic 时没有给出划分 partitions 个数, 这个数字将是 topic 下 partitions 数目的默认数值。
    
    num.recovery.threads.per.data.dir=1
    #每个数据目录用来日志恢复的线程数目
    
    offsets.topic.replication.factor=3
    #topic 的 offset 的备份份数。 建议设置更高的数字保证更高的可用性
    transaction.state.log.replication.factor=3
    #事务主题的复制因子(设置更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求。
    transaction.state.log.min.isr=3
    #覆盖事务主题的min.insync.replicas配置。
    
    
    #log.flush.interval.messages=10000
    #此项配置指定时间间隔: 强制进行 fsync日志。 例如, 如果这个选项设置为 1, 那么每条消息之后都需要进行 fsync, 
    #如果设置为 5, 则每 5 条消息就需要进行一次fsync。 一般来说, 建议你不要设置这个值。 
    
    #log.flush.interval.ms=1000
    #此项配置用来置顶强制进行 fsync 日志到磁盘的时间间隔; 例如, 如果设置为1000, 那么每 1000ms 就需要进行一次fsync。 一般不建议使用这选项
    
    log.retention.hours=168
    #log.retention.bytes=1073741824
    #每个日志文件删除之前保存的时间。 默认数据保存时间对所有 topic 都一样。
    #log.retention.hours和log.retention.bytes 都是用来设置删除日志文件的, 无论哪个属性已经溢出。
    #这个属性设置可以在 topic 基本设置时进行覆盖。
    
    log.segment.bytes=1073741824
    #topic partition 的日志存放在某个目录下诸多文件中, 这些文件将 partition 的日志切分成一段一段的; 这个属性就是每个文件的最大尺寸;
    #当尺寸达到这个数值时, 就会创建新文件。 此设置可以由每个 topic 基础设置时进行覆盖。
    
    log.retention.check.interval.ms=300000
    #检查日志分段文件的间隔时间, 以确定是否文件属性到达删除要求
    
    zookeeper.connect=10.160.22.1:2181,10.160.22.2:2181,10.160.22.3:2181
    #zookeeper集群
    
    zookeeper.connection.timeout.ms=6000
    #客户端等待和 zookeeper 建立连接的最大时间
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
    #主要作用是让coordinator推迟空消费组接收到成员加入请求后本应立即开启的rebalance。在实际使用时,假设你预估你的所有consumer组成员加入需要在10s内完成,那么你就可以设置该参数=10000。
    

    三、Zookeeper存储结构详解

    kafka 元数据存储目录

    admin: 存储管理员接口操作的相关信息,主要为 topic 删除事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)
    brokers: 主要存储 broker 相关的信息,broker 节点以及节点上的 topic 相关信息
    cluster: 存储 kafka 集群信息
    config: 存储 broker,client,topic,user 以及 changer 相关的配置信息
    consumers: 存放消费者相关信息 (一般为空)
    controller: 用于存放控制节点信息 (注意:该节点是一个临时节点,用于 controller 节点注册)
    controller_epoch: 用于存放 controller 节点当前的年龄
    isr_change_notification: 用于存储 isr 的变更通知 (临时节点,当有 isr 进行变动时,会用于事件通知,可进行 watch 获取集群 isr 状态变更)
    latest_producer_id_block: 该节点用于存储处理事务相关的 pid 范围
    log_dir_event_notification: 日志目录事件通知
    

    admin 目录结构

    [zk: 127.0.0.1:2181(CONNECTED) 2] ls /kafka/admin
    [delete_topics]
    [zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka/admin/delete_topics
    []
    [zk: 127.0.0.1:2181(CONNECTED) 4] get  /kafka/admin/delete_topics
    null
    

    brokers 目录结构

    # broker和topic列表数据
    [zk: 127.0.0.1:2181(CONNECTED) 5] ls /kafka/brokers
    [ids, seqid, topics]
    
    # 表示当前集群有3个节点
    [zk: 127.0.0.1:2181(CONNECTED) 6] ls /kafka/brokers/ids
    [1, 2, 3]
    [zk: 127.0.0.1:2181(CONNECTED) 7] ls /kafka/brokers/seqid
    []
    # 表示集群当前的topic信息
    [zk: 127.0.0.1:2181(CONNECTED) 8] ls /kafka/brokers/topics
    [__consumer_offsets, appjsonlog_heartbeat, nginx_log,ingress_log ]
    
    # broker详情
    [zk: 127.0.0.1:2181(CONNECTED) 10] get  /kafka/brokers/ids/1
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":9999,"host":"10.0.0.1","timestamp":"1588925944886","port":9092,"version":4}
    
    # topic详情
    [zk: 127.0.0.1:2181(CONNECTED) 11] get  /kafka/brokers/topics/__consumer_offsets
    {"version":2,"partitions":{"1":[2,1,3],"0":[3,2,1],"2":[1,2,3],"adding_replicas":{},"removing_replicas":{}}
    
    
    [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
    [partitions]
    [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
    [0, 1, 2]
    
    # 查看topic某个分区的状态详情
    [zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions/1
    [state]
    [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/1/state
    []
    [zk: localhost:2181(CONNECTED) 8] get  /brokers/topics/__consumer_offsets/partitions/1/state
    {"controller_epoch":3,"leader":1,"version":1,"leader_epoch":12,"isr":[3,1]}
    

    cluster 目录结构

    [zk: 127.0.0.1:2181(CONNECTED) 5] get   /kafka/cluster/id
    {"version":"1","id":"5C-JZf4vRdqKzlca7Lv7pA"}
    cZxid = 0x100000018
    ctime = Fri Mar 30 22:21:07 CST 2018
    mZxid = 0x100000018
    mtime = Fri Mar 30 22:21:07 CST 2018
    pZxid = 0x100000018
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 45
    numChildren = 0
    

    config 目录结构

    [zk: 127.0.0.1:2181(CONNECTED) 12] ls /kafka/config
    [brokers, changes, clients, topics, users]
    
    # 需要注意的是,config目录用来存放各种实体的配置,用于使用kafka相关工具对实体进行的配置变更存储
    # 因此,一般在kafka集群运行后,如不设置相关动态参数,该目录下的配置一般为空
    [zk: 127.0.0.1:2181(CONNECTED) 19] ls /kafka/config/brokers
    []
    [zk: 127.0.0.1:2181(CONNECTED) 20] ls /kafka/config/changes
    []
    [zk: 127.0.0.1:2181(CONNECTED) 21] ls /kafka/config/clients
    []
    [zk: 127.0.0.1:2181(CONNECTED) 22] ls /kafka/config/topics
    [__consumer_offsets]
    [zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka/config/users
    []
    
    # 可以看到,存储消费者偏移量的topic在配置中
    # 是因为该topic有一些额外的topic级别参数
    # 如果我们对topic参数有过动态变更,将会在这里存储
    [zk: 127.0.0.1:2181(CONNECTED) 24] get /kafka/config/topics/__consumer_offsets
    {"version":1,"config":{"segment.bytes":"104857600","compression.type":"producer","cleanup.policy":"compact"}}
    

    controller 目录结构

    # 可以看到当前broker-1为集群的controller节点
    [zk: 127.0.0.1:2181(CONNECTED) 30] get  /kafka/controller
    {"version":1,"brokerid":1,"timestamp":"1588833869354"}
    

    controller_epoch 目录结构

    # 可以看到controller的年龄是2,说明controller经历过2次变更了
    [zk: 127.0.0.1:2181(CONNECTED) 32] get  /kafka/controller_epoch
    2
    

    四、Kafka原理文章推荐

    震惊了!原来这才是kafka!

    五、滴滴开源的kafka-manager编译及部署

    官方GitHub
    官方用户手册

    准备

    • 安装JDK1.8+的linux服务器
    • ​Mysql(Drds)
    • ​Mysql建库kafka-manager,执行初始化脚本 create_mysql_table.sql

    编译

    • 下载源码
    • 修改默认管理数据库驱动
    #进入下载的源码目录/kafka-manager-master/dao修改pom.xml文件,新增mysql8的依赖,如果需要mysql5也是一样的方法。
    
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.13</version>
    </dependency>
    
    #修改之后到/kafka-manager-master目录下进入cmd命令,执行mvn install,等待一会即可编译成功。
    #然后进入kafka-manager-master\web\src\main\resources下将application.yml 以及kafka-manager-master\web\target下的编译成功后的kafka-manager-web-1.0.0-SNAPSHOT.jar上传到服务器。
    
    • 修改application.yml配置文件
    server:
      port: 8080
      tomcat:
        accept-count: 100
        max-connections: 1000
        max-threads: 20
        min-spare-threads: 20
     
    spring:
      application:
        name: kafkamanager
      datasource:
        kafka-manager:
          jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
          username: root
          password: 123456
          driver-class-name: com.mysql.jdbc.Driver #MYSQL8 需要在dao文件夹内修改pom.xml里的依赖为mysql8然后重新打包
      main:
        allow-bean-definition-overriding: true
     
      profiles:
        active: dev
     
    logging:
      config: classpath:logback-spring.xml
     
    # kafka监控
    kafka-monitor:
      enabled: true
      notify-kafka:
        cluster-id: 95
        topic-name: kmo_monitor
    
    • 启动
    nohup java -jar kafka-manager-web-1.0.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 &
    

    访问 http://localhost:8080,输入帐号及密码进行登录,默认账号是admin/admin(在管理数据库account表内也可以查到)

    相关文章

      网友评论

          本文标题:Kafka集群搭建

          本文链接:https://www.haomeiwen.com/subject/voqmcktx.html