安装问题
- 1.出现broker id报错
集群启动时,需要不同的broker id,如果之前配置的一样,那么修改了server.properties,还需要修改/tmp/kafka-logs/meta.properties - 2.需要修改配置文件中的log.dirs
集群中的host不能使用一样的。
测试
- 1.生产者
import time
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = ['192.168.124.116:9092', '192.168.124.116:9092', '192.168.124.116:9092'])
# Assign a topic
topic = 'my-topic'
def test():
print('begin')
n = 1
while (n<=100):
producer.send(topic, 'I am'+' '+str(n))
print "send" + str(n)
n += 1
time.sleep(0.5)
print('done')
if __name__ == '__main__':
test()
- 2.消费者
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', bootstrap_servers = ['192.168.124.116:9092', '192.168.124.116:9092', '192.168.124.116:9092'])
for msg in consumer:
print msg
多个topic并发
python3有差异,传入topic的内容需要byte,所以做了变换。
import time
import threading
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = ['192.168.124.116:9092', '192.168.124.116:9092', '192.168.124.116:9092'])
def test(topic,content):
print('begin')
n = 1
while (n<=50):
aa = content +' '+ str(n)
contentb = bytes(aa, encoding="utf8")
producer.send(topic, contentb)
print("send" + str(n))
n += 1
time.sleep(0.5)
print('done')
if __name__ == '__main__':
for i in range(1,3):
topic = 'my-topic' + str(i)
print(topic)
content = str(i)+' '+ 'is'
thread = threading.Thread(target=test, name='test', args=(topic, content))
thread.start()
网友评论