1.前言
what?
kafka是一个分布式且基于发布/订阅的消息系统。
架构图如下:
概念:
producer:生产者,负责发布消息到kafka
broker:代表一台或多台服务器
topic:每条发送到kafka集群的消息都会有一个类别,物理上不同的topic存储到不同的broker,逻辑上一个topic可能分别存储在一个或多个broker上,但是生产者或消费者只需要指定topic而无需关心存储
zookeeper:集群管理
流程:
Producer使用push模式将消息发布到broker,而Consumer使用pull模式从broker订阅并消费消息。
2.下载kafka
wget http://mirrors.cnnic.cn/apache/kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz
tar -zxvf kafka_2.10-0.10.0.1.tgz
mv kafka_2.10-0.10.0.1 /usr/local/kafka
3.配置kafka
cd /usr/local/kafka/config
vim zookeeper.properties
#文件中dataDir=/tmp/zookeeper代表zookeeper数据存储路径,clientPort=2181代表zookeeper的端口号为:2181,消费者需要监听的端口
#修改kafka的server配置
vim server.properties
#把listeners行的注释去掉,这个表示kafka消息队列的监听端口,用于生产者监听使用
#把listeners修改为如下:
listeners=PLAINTEXT://你的ip地址:9092
#倒数第三行 ‘zookeeper.connect=localhost:2181’ 代表连接zookeeper
4.简单使用
kafka启动顺序:先启动zookeeper,再启动kafka server
cd /usr/local/kafka
#启动zookeeper(窗口1)
bin/zookeeper-server-start.sh config/zookeeper.properties
#启动kafka(窗口2)
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
#启动消费者(窗口3)
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
#启动生产者(窗口4)
cd /usr/local/kafka
bin/kafka-console-producer.sh --broker-list 你的IP地址:9092 --topic test
#随便写入消息后回车,这时窗口3的消费者就会立即显示你写入的消息
5.遇上的问题:
①Apache kafka: Failed to acquire lock on file .lock in tmp/kafka-logs
②ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.NoClassDefFoundError: Could not initialize class kafka.network.RequestChannel$
解决:可能是虚拟机的主机名有问题
vim /etc/sysconfig/network
HOSTNAME=你的IP地址reboot
网友评论