首先需要下载java及gradle编译工具 (Mac OS系统为例)
java -version #查看当前java版本
安装gradle 编译工具
https://gradle.org/install/
mac os上安装命令:
brew install gradle
在kafka目录:
gradle build
kafka下载:
wget http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka-1.1.0-src.tgz
tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0
目前版本的kafka安装包自带了zookeeper服务
启动本机zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka Broker
bin/kafka-server-start.sh config/server.properties
#kafka默认配置文件
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
单节点 - 单代理配置
创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic队列
bin/kafka-topics.sh --list --zookeeper localhost:2181
发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
同时启动生产者和消费者时,消息将自动同步;
单节点-多代理 配置 (multi-broker)
继续之上单broker(配置为:conf/server.properties),我们需要再启动2个broker实例,需要再复制现有server.propertite 2份。
#原先配置文件--conf/server.properties
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#新配置文件--conf/server-two.properties
broker.id=2
listeners=PLAINTEXT://:9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-two
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#新配置文件2--conf/server-three.properties
broker.id=3
listeners=PLAINTEXT://:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-three
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
启动多个代理
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
创建具有三副本的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
输出:
Created topic "my-replicated-topic".
查看每个broker在做什么
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
输出:
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 3,0,2 Isr: 3,0,2

第一行对此Topic进行综合性介绍,其下每一行,对应一个Partition(由于示例只有一个partition所以只有一行)。
其中Leader, Relicas, Isr引用如下官方介绍。
"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
发布与接收消息与单队列一样。
目前Leader是3,我们通过kill id=3的broker,来测试kafka的容错。
ps aux | grep server-three
删除对应进程
此时,会进行新的leader选举,并且原先的id=3的broker也不在Isr列表中。【此时之前发的消息,还是可以再从begining开始消费的】

网友评论