美文网首页
kafka 集群搭建

kafka 集群搭建

作者: 远方的猫dj | 来源:发表于2018-08-29 22:25 被阅读0次

    本文用以记录 kafka 集群安装

    基础环境

    jdk 1.8
    zookeeper-3.4.13
    kafka_2.11-0.11.0.0

    1、设置host、安装 jdk 和免密登录

    参照:hadoop 集群搭建

    2、安装 zookeeper 集群

    参照:Hadoop、Hbase HA 高可用集群搭建

    • zookeeper 启停脚本: ./start.sh start
    #!/bin/bash
    # 启动 ./start.sh start  
    # 停止 ./start.sh stop
    #参数传递
    usage="Usage: $0 (start|stop|status)"
    if [ $# -lt 1 ]; then
      echo $usage
      exit 1
    fi
    behave=$1
    echo "$behave zkServer cluster"
    #主机名称
    for i in 1 2 3
    do
    #使用ssh进行启动
    ssh kafka$i "/home/zookeeper/zookeeper-3.4.13/bin/zkServer.sh $behave"
    done
    exit 0
    

    3、安装 kafka 集群

    • 下载 kafka下载地址 下载,我这里选择2.11-0.11.0.0版本,通过ssh上传到服务器
    • 解压 tar -zxvf kafka_2.11-0.11.0.0.tgz
    • 设置环境变量 vim /etc/profile
    export KAFKA_HOME=/home/kafka/kafka_2.11-0.11.0.0
    export PATH=$KAFKA_HOME/bin:$PATH
    #配置生效: source /etc/profile
    
    • 修改配置文件
      进入到config目录 cd /home/kafka/kafka_2.11-0.11.0.0/config/
    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样,每台服务器的broker.id都不能相同
    port=19092 #当前kafka对外提供服务的端口默认是9092
    host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880  #消息保存的最大值5M
    default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880  #取消息的最大直接数
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
    zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
    
    • 复制到其他服务器
    scp -r /home/kafka/kafka_2.11-0.11.0.0 kafka2: /home/kafka
    scp -r /home/kafka/kafka_2.11-0.11.0.0 kafka3: /home/kafka
    #修改broker.id、host.name
    

    4、启动Kafka集群并测试

    • 启动服务
    #1.启动zookeeper集群(3台都需要启动)
    cd /home/zookeeper/zookeeper-3.4.13/bin
    ./zkServer.sh start
    #从后台启动Kafka集群(3台都需要启动)
     cd /home/kafka/kafka_2.11-0.11.0.0/bin #进入到kafka的bin目录 
    ./kafka-server-start.sh  ../config/server.properties
    
    • 检查服务是否启动
    #执行命令jps
    20348 Jps 
    4233 QuorumPeerMain 
    18991 Kafka
    
    • 创建Topic来验证是否创建成功
    #创建Topic
    ./kafka-topics.sh --create --zookeeper 192.168.7.100:12181 --replication-factor 2 --partitions 1 --topic shuaige
    #解释
    --replication-factor 2   #复制两份
    --partitions 1 #创建1个分区
    --topic #主题为shuaige
    
    '''在一台服务器上创建一个发布者'''
    #创建一个broker,发布者
    ./kafka-console-producer.sh --broker-list 192.168.7.100:19092 --topic shuaige
    
    '''在一台服务器上创建一个订阅者'''
    ./kafka-console-consumer.sh --zookeeper localhost:12181 --topic shuaige --from-beginning
    
    • kafka 启停脚本:
      注:kafka停止脚本要调用kafka官方的kafka-server-stop.sh脚本。 但是官方的kafka-server-stop.sh 脚本是优点问题,不会真正的停止kafka。所以要先修改kafka-server-stop.sh
    PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
    修改为:
          PIDS=$(jps -lm | grep -i 'kafka\.Kafka' | awk '{print $1}')
    
    #启动:
    #!/bin/bash
    
    brokers="kafka1 kafka2 kafka3"
    kafka_home="/home/kafka/kafka_2.11-0.11.0.0"
    
    for i in $brokers
    do
        echo "Starting kafka on ${i} ... "
        ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}
    /config/server.properties > /dev/null 2>&1 &"
        if [[ $? -ne 0 ]]; then
            echo "Start kafka on ${i} is OK !"
        fi
    done
    echo kafka kafka are started !
    exit 0
    
    #停止
    #!/bin/bash 
    
    brokers="kafka1 kafka2 kafka3"
    kafka_home="/home/kafka/kafka_2.11-0.11.0.0"
    
    for i in $brokers
    do
        echo "Stopping kafka on ${i} ..."
        ssh ${i} "source /etc/profile;bash ${kafka_home}/bin/kafka-server-stop.sh"
        if [[ $? -ne 0 ]]; then
            echo "Stopping ${kafka_home} on ${i} is down"
        fi
    done
    
    echo all kafka  are stopped !
    exit 0
    

    更多请看官方文档:http://kafka.apache.org/documentation.html

    测试(在发布者那里发布消息看看订阅者那里是否能正常收到~)

    • yahoo kafka 管理平台 安装

    kafka-manager
    下载编译好的,修改配置文件zookeeper参数,启动运行
    注:关闭重启时,要删除RUNNUING文件


    5、 其他命令
    大部分命令可以去官方文档查看

    • 查看topic
    ./kafka-topics.sh --list --zookeeper localhost:12181
    #就会显示我们创建的所有topic
    
    • 查看topic状态
    /kafka-topics.sh --describe --zookeeper localhost:12181 --topic shuaige
    #下面是显示信息
    Topic:ssports    PartitionCount:1    ReplicationFactor:2    Configs:
        Topic: shuaige    Partition: 0    Leader: 1    Replicas: 0,1    Isr: 1
    #分区为为1  复制因子为2   他的  shuaige的分区为0 
    #Replicas: 0,1   复制的为0,1
    

    OK kafka集群搭建完毕

    6、其他说明标注

    • 日志说明

    默认kafka的日志是保存在/opt/kafka/kafka_2.10-0.9.0.0/logs目录下的,这里说几个需要注意的日志

    server.log #kafka的运行日志
    state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
    
    controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
    
    • 上面的大家你完成之后可以登录zk来查看zk的目录情况
    #使用客户端进入zk
    ./zkCli.sh -server 127.0.0.1:12181  #默认是不用加’-server‘参数的因为我们修改了他的端口
    
    #查看目录情况 执行“ls /”
    [zk: 127.0.0.1:12181(CONNECTED) 0] ls /
    
    #显示结果:[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]
    '''
    上面的显示结果中:只有zookeeper是,zookeeper原生的,其他都是Kafka创建的
    '''
    
    #标注一个重要的
    [zk: 127.0.0.1:2181(CONNECTED) 1] get /brokers/ids/0
    {"jmx_port":-1,"timestamp":"1456125963355","endpoints":["PLAINTEXT://192.168.7.100:19092"],"host":"192.168.7.100","version":2,"port":19092}
    cZxid = 0x1000001c1
    ctime = Mon Feb 22 15:26:03 CST 2016
    mZxid = 0x1000001c1
    mtime = Mon Feb 22 15:26:03 CST 2016
    pZxid = 0x1000001c1
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x152e40aead20016
    dataLength = 139
    numChildren = 0
    [zk: 127.0.0.1:12181(CONNECTED) 2] 
    
    #还有一个是查看partion
    [zk: 127.0.0.1:2181(CONNECTED) 7] get /brokers/topics/shuaige/partitions/0
    null
    cZxid = 0x100000029
    ctime = Mon Feb 22 10:05:11 CST 2016
    mZxid = 0x100000029
    mtime = Mon Feb 22 10:05:11 CST 2016
    pZxid = 0x10000002a
    cversion = 1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 0
    numChildren = 1
    [zk: 127.0.0.1:12181(CONNECTED) 8] 
    

    相关文章

      网友评论

          本文标题:kafka 集群搭建

          本文链接:https://www.haomeiwen.com/subject/koxqwftx.html