###kafka结构
1.broker服务: 一般情况下一台主机就一个broker服,但也可以一台主机有多个broker服务,只要端口不一样,存储路径不一样就可以(不推荐)
2.zookeeper服务: 管理broker集群,管理元数据
3.producer : 生产这,发布消息-主题
4.consumer: 消费者
5.consumer group : 消费组,在同一个消费组中的消费者,对同一条消息只能消费一次
6.offset: 某个消费组,在当前某个主题下的某个分区的消费偏移量
#### kafka安装和启动
1.安装zookeeper
2.kafka安装
3.kafka启动zk
4.kafka启动服务器
5.创建主题
6.创建生成者
7.创建消费者
####生产者
必须根据kafka现有主题进行数据生产
producer 注册到zookeeper
消息推送给kafka,消息进来后,在kakfa集群中
broker 注册到zookeeper
创建主题 要指定 副本数,分区数;
1个主题至少有一个分区,数据都存在分区中,在
消费者注册到 zookeeper
可以消费某个主题,某个分区,或几个主题,几个分区
###主题
kafka将消息以topic为单位进行归纳
在kafka集群中,可以有无数的主题
从生产者的角度来说,他所操作的单元,一般情况下是以主题为单位
从消费者角度来说,他所操作的单元,一般情况下也是以主题为单位
生产者、消费者可以以主题更细的单位来操作(分区)
也是消息的分类。
主题支持多个消费者订阅,也可以是0个,
从kafka角度来说,没有限制生产者,也没有限制消费者;唯一的限制,消费者、生产者必须知道主题是哪一个
>1.创建主题
kafka-topics.sh --action --topic topicName --zookeeper zookeeperip --partitions num --replication-factor num --config 属性值
action 是动作 包含 create(创建) ,alter (修改),delete(删除)
一个broker(一台服务器下),可以创建多个分区
一个broker(一台服务器),副本因子小于等于broker 数
--config 设置属性值
alter (修改)一般是修改config属性
alter --delete-config 删除属性
--delete 删除topic 是标记删除,不是物理删除,标记删除后仍然可用写入数据;在重启kafka之后才会删除,0.10里面没有删除topic的功能,需要手动配置
查看当前主题命令
kafka-topics.sh --describe --zookeeper zookeeperIp --topic topicname 针对某一个,不传是所有主题
describe: 很详细
支持 grep 过滤
示例demo
Topic:tianzehao123 (主题名) PartitionCount:2 (分区总数) ReplicationFactor:1(副本个数) Configs:
Topic: tianzehao123 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: tianzehao123 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
在kafka中每个分区会有一个编号,从0开始
leader: broker.id= 0 【默认在server.properties配置文件中,多个broker那么broker.id不能相同】kafka中如果有多个副本的话,就会存在leader与follower的关系,
replicas: 所有副本列表 0,1,2
lsr: 可用副本列表0,1,2
思考: 主题(topic )消息在进入kafka集群的时候,是以主题进行归类;也就是说一条消息必须属于一套主题;在kafka集群中可以有无数的主题,但是主题是消息的归类,
###分区
目的: 让消费者那数据时候更快;一个broker服务下可以有多个分区;分区中数据是有序的切不可修改。但不同的分区数据是无需的
思考1: 消费者指定主题的话,那么在拿数据时候,具体拿哪一个分区数据有kafkas说了算由于负载均衡,可能导致生产方数据顺序与拿到的顺序不一致。【有可能出现,先退货,然后再去发货了】?
解决这个问题: 让同一类的数据进入到同一个分区里(同一个分区下的数据是有序的)
思考2: 如何保证一个主题下的数据,一定是有序的
可以用key 来进行分区
新进来的数据会追加到某一个分区的尾部。
1.创建
topic 创建时穿件partitions
2.partition数量决定了每个consumer group中并发消费者的最大数量
3.consumer group
分区与消费组直接的关系
消费组: 有一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次。
当某一个主题下的分区数、对于消费组来说,应该小于等于该主题下的分区数
如:某一个主题下有4个分区,那么消费组中消费者应该小于4而且最好与分区数成整倍 1 ,2,4
同一个分区下的数据,在同一时刻,不能同一个消费组中的不同消费者消费;否则会造成锁死的概念
分区中的数据都有相应的编号 offset 偏移量
偏移量:记录当前有多少记录数,而且可以让消费者可以知道自己消费到什么地方,可以让消费者自定义选择消费某一条消息【对于同一条消息,消费者可以消费多次】
一般用在自定义消费
![](https://img.haomeiwen.com/i4653663/6b3fd171f2167746.png)
###副本因子
创建副本因子时,副本因子数应小于等于可用的broker数
副本因子操作的单位是以分区为单位
当有多个副本数时,kafka并不是将多个副本同事对外提供读取和写入,对外提供的只会有一个有leader对外提供读写服务。其他follower 只是备份,数据同步而已
作用是让kafka读取和写入数据时高可用
副本因子数时包含本身 | 同一个副本不能放在同一个broker中
在多个副本的情况下,kafka回味同一个分区下的分区,设定角色关系,一个leader和n个follower
leader 负责对外进行读写处理,follower负责数据同步
如果某一个分区有三个副本因子,就算其中一个挂掉,只会在剩下两个中选择一个leader 但不会在其他的broker中另起一个副本;主要是因为kafka是一个高吞吐量的消息系统,因此不会另起副本
lsr: 当前可用副本,选择时是从src中选择
ack机制: 确认机制;具体使用哪种方式,由生产者确定使用
生产方可以采用同步或者异步的方式
同步:发送一批数据给kafka之后,等待kafka返回结果;
异步:发送一批数据给kafka后,只提供一个回调的函数
生产者拿到返回结果是什么时候那
1.需要所有的副本确认,才表示该条消息写入成功,返回给生产者
2.不需要确认,数据会丢失
再次发送解决这个问题
3.只需要leader 确认, 数据也可能丢失
4.大部分确认
思考: 如何解决数据丢失,数据重复
同步异步确认机制说明图
![](https://img.haomeiwen.com/i4653663/87624d04b402eaa3.png)
###消费者拉取数据时可以采用kafka两种api
高级api : 让用户使用时,很方便。大部分操作都是已经封装好的。比如:当前消到那个位置下,但不够灵活 (推荐)
低级api: 没有进行包装,所有操作由用户决定,如自己得保存某一个分区下的几率,你当前消费到那个位置,灵活度提高需要自己为户
leader broker 是之前的一种说法,现在基本不用
kafka集群中包含很多broker,但是这么多的broker中也会有个leader
在kafka节点中的一个临时节点,去创建相应的数据。名字叫controller broker
controller broker职责是管理所有的broker
controller_epoch 是选举用到的
![](https://img.haomeiwen.com/i4653663/8a616a126214be37.png)
![](https://img.haomeiwen.com/i4653663/2930cecaf7d53e3b.png)
![](https://img.haomeiwen.com/i4653663/4534a8b062eaac6c.png)
![](https://img.haomeiwen.com/i4653663/d966a93d3aafeb32.png)
![](https://img.haomeiwen.com/i4653663/f8a3923d49996c0d.png)
![](https://img.haomeiwen.com/i4653663/b6428e7f1b46af44.png)
![](https://img.haomeiwen.com/i4653663/6eb0b15c35858a49.png)
![](https://img.haomeiwen.com/i4653663/6852fcaa039509a4.png)
###集群搭建
1.zookeeper 集群搭建
1.broker.id必须不一样,创建myid 文件
2.log.dirs修改地址,之前默认是在/tmp
3.zookeerper.connect = 自己的集群地址 ;在这一步如果是自身的话 地址一定是0.0.0.0
4.启动
启动kafka(三台主机都要启动)
nohup /usr/src/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh /usr/src/kafka_2.11-0.9.0.1/config/server.properties &
创建topic
/usr/src/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --create --zookeeper node1,node2,node3 --topic kafkaroot --partitions 3 --replication 3
如果不熟可以使用--help
consumer
![](https://img.haomeiwen.com/i4653663/9141e938ef4ad83b.png)
网友评论