本文的示例代码参考kafka-demo
目录
服务
wget https://mirror.bit.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
tar xf kafka_2.13-2.6.0.tgz && cd kafka_2.13-2.6.0
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# quickstart-events
监控
CMAK
java -version
# java version "11.0.8" 2020-07-14 LTS
git clone https://github.com/yahoo/CMAK.git && cd CMAK
./sbt clean dist
unzip target/universal/cmak-3.0.0.5.zip
cd cmak-3.0.0.5
sed -i "" 's/kafka-manager-zookeeper/127.0.0.1/g' conf/application.conf
./bin/cmak
-
Add Cluster配置如下
vim provider.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
import time
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
while True:
for i in range(10):
producer.send('quickstart-events',
'{}'.format(i).encode(encoding="utf-8"))
time.sleep(5)
pipenv run python provider.py
vim consumer.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'quickstart-events', group_id='group-id', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
print(msg)
pipenv run python consumer.py
Burrow
方法1 源码安装
go version
# go version go1.14.2 darwin/amd64
git clone https://github.com/linkedin/Burrow.git && cd Burrow
go mod tidy
go install
go env | grep GOPATH
# GOPATH="/Users/kevin/go"
# 修改配置config/burrow.toml
# ZooKeeper servers=[ "127.0.0.1:2181" ]
# Kafka servers=[ "127.0.0.1:9092" ]
# 注释配置[notifier.default]
/Users/kevin/go/bin/Burrow --config-dir ./config
方案2 二进制安装
mkdir Burrow_1.3.4
tar xf Burrow_1.3.4_darwin_amd64.tar.gz -C Burrow_1.3.4 && cd Burrow_1.3.4
# 修改配置config/burrow.toml
# ZooKeeper servers=[ "127.0.0.1:2181" ]
# Kafka servers=[ "127.0.0.1:9092" ]
# 注释配置[notifier.default]
./burrow --config-dir ./config
API参考HTTP Endpoint
curl http://localhost:8000/v3/kafka | python3 -m json.tool
{
"error": false,
"message": "cluster list returned",
"clusters": [
"local"
],
"request": {
"url": "/v3/kafka",
"host": "yuanlindeiMac.local"
}
}
curl http://localhost:8000/v3/kafka/local/consumer | python3 -m json.tool
{
"error": false,
"message": "consumer list returned",
"consumers": [
"burrow-local",
"KMOffsetCache-yuanlindeiMac.local",
"group-id"
],
"request": {
"url": "/v3/kafka/local/consumer",
"host": "yuanlindeiMac.local"
}
}
curl http://localhost:8000/v3/kafka/local/consumer/group-id/status | python3 -m json.tool
{
"error": false,
"message": "consumer status returned",
"status": {
"cluster": "local",
"group": "group-id",
"status": "OK",
"complete": 1,
"partitions": [],
"partition_count": 1,
"maxlag": {
"topic": "quickstart-events",
"partition": 0,
"owner": "",
"client_id": "",
"status": "OK",
"start": {
"offset": 9409,
"timestamp": 1605580756934,
"observedAt": 1605580756000,
"lag": 0
},
"end": {
"offset": 9423,
"timestamp": 1605580826991,
"observedAt": 1605580826000,
"lag": 0
},
"current_lag": 35,
"complete": 1
},
"totallag": 35
},
"request": {
"url": "/v3/kafka/local/consumer/group-id/status",
"host": "yuanlindeiMac.local"
}
}
网友评论