美文网首页玩转大数据spark大数据
kafka集群搭建、避坑指南以及常用操作

kafka集群搭建、避坑指南以及常用操作

作者: 东皇Amrzs | 来源:发表于2016-09-29 11:51 被阅读4034次

    首先,kafka的搭建是建立在zookeeper的基础上的,不懂zookeeper怎么搭建的可以先看我上一篇文章
    zookeeper环境搭建以及避坑指南

    一、下载并解压kafka。

    可以从 http://kafka.apache.org/downloads.html 这里下载最新的kafka镜像放到/usr/local/kafka目录里

    mkdir /usr/local/kafka
    cd /usr/local/kafka  
    tar -zxvf kafka_2.10-0.8.2.2.tgz 
    

    二、创建日志目录

    kafka的日志目录正常情况下会很大(连续跑一周上百G很容易),可以专门挂载一个盘来存储kafka的目录,这里我挂载到了/data目,当然,如果你是用了我下面的配置参数,那每个日志文件最多也就1个G,除非你的业务需要日志文件保留很长时间,否则就按照第三步的配置来就行了

    mkdir /data/kafka-logs   
    

    三、修改配置文件(列出来的都是可避坑配置项,血与泪的教训)

    kafka的配置文件为conf/server.properties修改此配置文件即可

    注意,只有broker.idlisterners的要和具体的节点相对应,剩下的是直接相同的。下文中列出来的都是需要修改或者添加的

    ############################# Server Basics #############################
    broker.id=0
    ############################# Socket Server Settings #############################
    listeners=PLAINTEXT://192.168.1.150:9092 #如果不配置,默认取PLAINTEXT://your.host.name:9092
    ############################# Log Basics #############################
    log.dirs=/home/xzp/kafka-logs
    delete.topic.enable=true                 #通过配置此项使得删除topic的指令生效
    ############################# Log Retention Policy #############################
    # The minimum age of a log file to be eligible for deletion
    log.cleanup.policy=delete # 日志的清除策略:直接删除
    log.retention.hours=72  # 日志保存时间为3天 
    log.segment.bytes=1073741824 # 每个日志文件的最大的大小,这里为1GB
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    ############################# Zookeeper #############################
    #配置zookeeper集群url,这里很重要
    zookeeper.connect=192.168.1.145:2181,192.168.1.150:2181,192.168.1.138:2181
      
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    

    尤其注意这里的 Log Retention Policy配置项,清除无用的日志文件很重要.

    四、启动kafka:

    这里注意,记得要先启动zookeeper。确保zookeeper启动以后再执行kafka的启动命令:

    QuorumPeerMain就是zookeeper进程
    bin/kafka-server-start.sh -daemon config/server.properties
    

    七、检测是否成功启动:

    有kafka进程存在即成功启动

    八、kafka常用操作流程:

    Step 1: Start the server
    后台方式启动,推荐第一次配置的新手不要加入-daemon参数,看看控制台输出的是否有success.

    bin/kafka-server-start.sh -daemon config/server.properties 
    

    Step 2: Create a topic(replication-factor一定要大于1,否则kafka只有一份数据,leader一旦崩溃程序就没有输入源了,分区数目视输入源而定)

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic nodeHlsTest
    

    Step 3: Describe a topic

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic nodeHlsTest
    

    step 3: list the topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    step 4: send some message

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nodeHlsTest
    

    step 5: start a consumer

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nodeHlsTest --from-beginning
    

    step 6: delete a topic
    要事先在 serve.properties 配置 delete.topic.enable=true

    bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic nodeHlsTest
    # 如果仍然只是仅仅被标记了删除(zk中并没有被删除),那么启动zkCli.sh,输入如下指令
    rmr /brokers/topics/nodeHlsTest
    

    相关文章

      网友评论

        本文标题:kafka集群搭建、避坑指南以及常用操作

        本文链接:https://www.haomeiwen.com/subject/opmkyttx.html