Kafka集群

作者: 六弦极品 | 来源:发表于2019-01-19 11:37 被阅读60次

    Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。
    Zookeeper集群部署请查阅上节

    一、Kafka 集群下载安装

    1、下载包

    $ cd  /usr/local/services/src
    $ wget http://apache.communilink.net/kafka/2.1.0/kafka_2.12-2.1.0.tgz
    $ tar xvf kafka_2.12-2.1.0.tgz -C ../
    $ cd ../kafka_2.12-2.1.0/
    

    2、配置
    broker配置:
    broker配置文件为config/server.properties文件,配置内容主要分为以下几个模块,

    Server Basics

    Kafka server 基本配置
    broker.id:是kafka集群server的唯一标识。

    broker.id=1
    
    Socket Server Settings

    Kafka 网络相关配置

    listeners:由用户配置协议,ip,port。
    num.network.threads:这个是borker进行网络处理的线程数
    num.io.threads:这个是borker进行I/O处理的线程数
    socket.send.buffer.bytes: 发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes:kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes:这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    其他配置项,开发测试环境可使用默认配置;生产环境推荐如下配置。

    listeners=PLAINTEXT://10.4.4.151:9092
    num.network.threads=8
    num.io.threads=8
    socket.send.buffer.bytes=1024000
    socket.receive.buffer.bytes=1024000
    socket.request.max.bytes=104857600
    
    Log Basics

    Kafka log 基本配置

    log.dirs:log文件存储路径
    num.partitions:topic默认的partitions数量。在创建topic时,一般会指定partitions数量,因此该配置项在上述条件下基本无用。为了防止在创建topic时,未指定partitions数量,因此推荐使用配置为3。
    其他配置推荐使用默认配置

    log.dirs=/data/kafka/kafka-sa
    num.partitions=3
    num.recovery.threads.per.data.dir=1
    
    Internal Topic Settings

    Kafka 内部topic配置

    开发测试环境推荐使用默认配置,均为1
    生产环境推荐如下配置,replication数量为3,isr数量为2。

    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    
    Log Flush Policy

    Kafka log 刷盘、落盘机制

    log.flush.interval.messages:日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
    log.flush.interval.ms:日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。
    强烈推荐开发、测试、生产环境均采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
    若对消息高可靠性要求较高的应用系统,可针对topic级别的配置,配置该属性。

    Log Retention Policy

    Kafka log保留策略配置

    log.retention.hours:日志保留时间,单位小时。和log.retention.minutes两个配置只需配置一项。
    message.max.bytes:表示接受消息体的最大大小,单位是字节
    default.replication.factor:默认的备份的复制自动创建topics的个数
    replica.fetch.max.bytes:最大备份的拉取数量
    log.retention.bytes:日志保留大小。一topic的一partition下的所有日志大小总和达到该值,即进行日志清除任务。当日志保留时间或日志保留大小,任一条件满足即进行日志清除任务,-1表示不限制。
    log.segment.bytes:日志分段大小。即一topic的一partition下的所有日志会进行分段,达到该大小,即进行日志分段,滚动出新的日志文件。
    log.retention.check.interval.ms:日志保留策略定期检查时间间隔,单位ms。
    日志保留大小,保留时间以及日志分段大小可根据具体服务器磁盘空间大小,业务场景自行决定。

    log.retention.hours=3
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    log.retention.bytes=5368709120
    log.segment.bytes=536870912
    log.retention.check.interval.ms=300000
    
    Zookeeper

    Kafka zookeeper 配置

    zookeeper.connect:zk连接地址
    zookeeper.connection.timeout.ms:zk连接超时时间,默认6s。可根据具体的应用场景进行更改,特可采用如下配置。

    zookeeper.connect=10.4.4.151:2181,10.4.4.152:2181,10.4.4.153:2181
    zookeeper.connection.timeout.ms=60000
    
    Group Coordinator Settings

    Kafka consumer group 协调配置

    生产环境推荐配置3000
    开发测试环境推荐配置0

    group.initial.rebalance.delay.ms=3
    

    二、启动Kafka集群并测试

    1、系统服务启动配置

    $ cat /lib/systemd/system/kafka-sa.service 
    [Unit]
    Description=Apache kafka-sa-sales
    After=network.target
    
    [Service]
    Type=simple
    Environment=JAVA_HOME=/usr/local/services/jdk1.8.0_91
    PIDFile=PIDFile=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-sa.pid
    ExecStart=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-server-start.sh  /usr/local/services/kafka_2.12-2.1.0/config/server.properties
    ExecStop=/bin/kill -TERM ${MAINPID}
    User=user_00
    Group=users
    Restart=always
    RestartSec=20
    
    [Install]
    WantedBy=multi-user.target
    

    2、启动服务
    从后台启动Kafka集群(3台都需要启动)

    [root@localhost logs]# systemctl start kafka-sa.service
    

    3、检查服务是否启动

    [root@localhost logs]# jps
    32502 Kafka
    521 Jps
    23151 QuorumPeerMain
    

    4、创建Topic来验证是否创建成功

    # cd /usr/local/services/kafka_2.12-2.1.0/bin
    # ./kafka-topics.sh --create --zookeeper 10.4.4.151:2181 --replication-factor 2 --partitions 1 --topic basketball
    Created topic "basketball".
    
    #解释
    --replication-factor 2   #复制两份
    --partitions 1 #创建1个分区
    --topic #主题为basketball
    

    查看所有topic和topic 状态

    # ./kafka-topics.sh --list --zookeeper 10.4.4.151:2181
    basketball
    # ./kafka-topics.sh --describe --zookeeper 10.4.4.151:2181 --topic basketball
    Topic:basketball    PartitionCount:1    ReplicationFactor:2 Configs:
        Topic: basketball   Partition: 0    Leader: 3   Replicas: 3,1   Isr: 3,1
    
    #分区为1  复制因子为2   Topic basketball的分区为0 
    #Replicas: 3,1   复制的为3,1
    
    ##创建一个broker,发布者发布消息
    ./kafka-console-producer.sh --broker-list 10.4.4.151:9092 --topic basketball
    >NBA
    
    ##在到另一台机器或同一台一台机器开一个终端创建一个消费者消费:
    ./kafka-console-consumer.sh --bootstrap-server 10.4.4.152:9092 --topic basketball --from-beginning
    NBA
    

    OKkafka集群搭建完毕

    日志说明

    server.log #kafka的运行日志
    state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
    
    controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
    

    登录zk来查看zk的目录情况

    #使用客户端进入zk
    ./zkCli.sh -server 10.4.4.152:2181
    
    #查看目录情况 执行“ls /”
    [zk: 10.4.4.152:2181(CONNECTED) 0] ls /
    [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    [zk: 10.4.4.152:2181(CONNECTED) 1]
    
    #显示结果:[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    ##上面的显示结果中:只有zookeeper是zookeeper原生的,其他都是Kafka创建的
    
    #标注一个broker
    [zk: 10.4.4.152:2181(CONNECTED) 3] get /brokers/ids/3
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.4.4.153:9092"],"jmx_port":-1,"host":"10.4.4.153","timestamp":"1547877474320","port":9092,"version":4}
    cZxid = 0x600000182
    ctime = Sat Jan 19 13:57:53 CST 2019
    mZxid = 0x600000182
    mtime = Sat Jan 19 13:57:53 CST 2019
    pZxid = 0x600000182
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x300046cf32a0005
    dataLength = 190
    numChildren = 0
    
    #查看partion
    [zk: 10.4.4.152:2181(CONNECTED) 4] get /brokers/topics/basketball/partitions/0
    null
    cZxid = 0x60000019f
    ctime = Sat Jan 19 14:07:05 CST 2019
    mZxid = 0x60000019f
    mtime = Sat Jan 19 14:07:05 CST 2019
    pZxid = 0x6000001a0
    cversion = 1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 0
    numChildren = 1
    

    相关文章

      网友评论

        本文标题:Kafka集群

        本文链接:https://www.haomeiwen.com/subject/lrxwdqtx.html