首先,kafka的搭建是建立在zookeeper的基础上的,不懂zookeeper怎么搭建的可以先看我上一篇文章
zookeeper环境搭建以及避坑指南
一、下载并解压kafka。
可以从 http://kafka.apache.org/downloads.html 这里下载最新的kafka镜像放到/usr/local/kafka
目录里
mkdir /usr/local/kafka
cd /usr/local/kafka
tar -zxvf kafka_2.10-0.8.2.2.tgz
二、创建日志目录
kafka的日志目录正常情况下会很大(连续跑一周上百G很容易),可以专门挂载一个盘来存储kafka的目录,这里我挂载到了/data
目,当然,如果你是用了我下面的配置参数,那每个日志文件最多也就1个G,除非你的业务需要日志文件保留很长时间,否则就按照第三步的配置来就行了
mkdir /data/kafka-logs
三、修改配置文件(列出来的都是可避坑配置项,血与泪的教训)
kafka的配置文件为conf/server.properties
修改此配置文件即可
注意,只有broker.id
和listerners
的要和具体的节点相对应,剩下的是直接相同的。下文中列出来的都是需要修改或者添加的
############################# Server Basics #############################
broker.id=0
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.1.150:9092 #如果不配置,默认取PLAINTEXT://your.host.name:9092
############################# Log Basics #############################
log.dirs=/home/xzp/kafka-logs
delete.topic.enable=true #通过配置此项使得删除topic的指令生效
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion
log.cleanup.policy=delete # 日志的清除策略:直接删除
log.retention.hours=72 # 日志保存时间为3天
log.segment.bytes=1073741824 # 每个日志文件的最大的大小,这里为1GB
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#配置zookeeper集群url,这里很重要
zookeeper.connect=192.168.1.145:2181,192.168.1.150:2181,192.168.1.138:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
尤其注意这里的 Log Retention Policy配置项,清除无用的日志文件很重要.
四、启动kafka:
这里注意,记得要先启动zookeeper。确保zookeeper启动以后再执行kafka的启动命令:
QuorumPeerMain就是zookeeper进程bin/kafka-server-start.sh -daemon config/server.properties
七、检测是否成功启动:
有kafka进程存在即成功启动八、kafka常用操作流程:
Step 1: Start the server
后台方式启动,推荐第一次配置的新手不要加入-daemon参数,看看控制台输出的是否有success.
bin/kafka-server-start.sh -daemon config/server.properties
Step 2: Create a topic(replication-factor一定要大于1,否则kafka只有一份数据,leader一旦崩溃程序就没有输入源了,分区数目视输入源而定)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic nodeHlsTest
Step 3: Describe a topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic nodeHlsTest
step 3: list the topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
step 4: send some message
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nodeHlsTest
step 5: start a consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nodeHlsTest --from-beginning
step 6: delete a topic
要事先在 serve.properties
配置 delete.topic.enable=true
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic nodeHlsTest
# 如果仍然只是仅仅被标记了删除(zk中并没有被删除),那么启动zkCli.sh,输入如下指令
rmr /brokers/topics/nodeHlsTest
网友评论