kafka
一、Kafka是什么:kafka一般用来缓存数据。
1.开源消息系统
2.最初是LinkedIn公司开发,2011年开源。2012年10月从Apache Incubator毕业。
- 项目目标是为处理实时数据,提供一个统一、高通量、低等待的平台。
3.Kafka是一个分布式消息队列。
- 消息根据Topic来归类,发送消息 Producer,接收 Consumer。
- kafka集群有多个kafka实例组成,每个实例成为broker
4.无论是kafka集群,还是 producer consumer 都依赖于 zookeeper 集群保存元信息,来保证系统可用性。
二、消息队列
1.点对点
2.发布、订阅模式
三、为什么需要消息队列
1.解耦
2.冗余
- 消息队列把数据进行持久化,直到他们已经被完全处理。
3.扩展性
4.灵活性
5.可恢复性
6.顺序保证(相对)
- kafka保证一个Partition内部的消息有序。
7.缓冲
8.异步通信
- 很多时候,用户不想也不需要立即处理消息。
- 消息队列提供异步处理机制,允许用户把消息放入队列,但不立即处理。
四、Kafka架构
kafka读写过程1.Producer 消息生产者,就是往kafka中发消息的客户端
2.consumer:消息消费者,向kafka broker中取消息的客户端。
3.topic 理解为队列。
4.Consumer Group 消费者组
- 组内有多个消费者实例,共享一个公共的ID,即groupID。
- 组内所有消费者协调在一起,消费topic。
- 每个分区,只能有同一个消费组内的一个consumer消费。
5.broker
- 一台kafka服务器就是一个broker
6.partition:一个topic分为多个partition
- 每个partition是一个有序队列。
- kafka保证按一个partition中的顺序将消息发送个consumer。
- 不能保证topic整体有序
7.offset:Kafka存储文件按照offset.kafka命名。
五.kafka安装部署
1.前提:
- 安装好zookeeper
2.下载
3.上传到linux
4.解压
cd /opt/software
tar -zxvf kafka_2.11-2.1.1.tgz -C /opt/module
cd /opt/module
mv kafka_2.11-2.1.1/ kafka/
5.修改配置文件
a.server.properties
broker.id=0 //broker 的 全局唯一编号,不能重复
delete.topic.enable=true //允许删除topic
num.network.threads=3 //处理网络请求的线程数量
num.io.threads=8 //用来处理磁盘IO的线程数量
image.png
log.dirs=/usr/local/kafka_2.11-2.1.1/logs //kafka运行日志存放的路径
zookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181 //zookeeper相关信息
image.png
image.png
6.发送到其他两台
(1)注意:一定要更改broker.id的值
(2)分发
cd /opt/module
scp -r kafka/ bigdata122:/opt/module
scp -r kafka/ bigdata123:/opt/module
(3)更改另外两台的broker.id
7.启动
(1)三台都启动
cd /opt/module/kafka
bin/kafka-server-start.sh config/server.properties &
8.关闭
cd /opt/module/kafka
bin/kafka-server-stop.sh stop
六.kafka常用命令:
启动:
cd /opt/module/kafka..
bin/kafka-server-start.sh -daemon ./config/server.properties
1.topic相关:
(1)创建主题
bin/kafka-topics.sh --zookeeper hadoop1:2181 --create --replication-factor 3 --partitions 1 --topic topic0908
参数 | 意义 |
---|---|
--create | 表示建立 |
--zookeeper | 表示ZK地址,可以传递多个,用逗号分隔--zookeeper IP:PORT,IP:PORT,IP:PORT |
--replication-factor | 表示副本数量,这里的数量是包含Leader副本和Follower副本,副本数量不能超过代理数量 |
--partitions | 表示主题的分区数量,必须传递该参数。Kafka的生产者和消费者采用多线程并行对主题的消息进行处理,每个线程处理一个分区,分区越多吞吐量就会越大,但是分区越多也意味着需要打开更多的文件句柄数量,这样也会带来一些开销。 |
--topic | 表示主题名称 |
(2)查看kafka集群的所有topic
bin/kafka-topics.sh --zookeeper hadoop1:2181 --list
(3)创建名称为topic1217的topic,partitions为1个,副本为1个
bin/kafka-topics.sh --zookeeper hadoop1:2181 --alter --topic topic1217 --partitions 1
(4)为topic1217添加partition为10,只能增加,不能减少
bin/kafka-topics.sh --zookeeper hadoop1:2181 --alter --topic topic1217 --partitions 10
(5)删除名称为topic0908的topic(只删除zookeeper内的元数据,消息文件须手动删除)
bin/kafka-topics.sh --zookeeper hadoop1:2181 --delete --topic topic0908
(6) 查看topic为topic1217的详细信息
bin/kafka-topics.sh -zookeeper hadoop1:2181 -describe -topic topic1217
2.consumer相关
(1)查看所有consumer的group
bin/kafka-consumer-groups.sh --bootstrap-server hadoop1:9092 --list
(2) 查看group为group_test消费的机器以及partition
bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server hadoop1:9092
3.控制台相关
(1)控制台向topic1217发送数据(生产消息)
bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic topic1217
(2)控制台接收topic1217数据(消费消息)
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic topic1217 --from-beginning
网友评论