一.什么是kafka
Kafka是一种消息中件间
在了解kafka之前,我们先来了解一下什么是消息中间件
消息中间件是在消息的传输过程中保存消息的容器
消息中间件的作用就是中介的作用
一个常见的最简单的UGC内容应用,至少包括APP、后端、审核后台、统计平台)。
UGC应用的生命力在于用户产生内容的质量,所以需要审核用户提交的内容;
另一方面,需要统计用户产生内容的数据。
用户A产生的内容C在系统间的流动路径是这样的,APP -> 后端 -> 审核 -> 统计,其中审核和统计可以同时进行,这就遇到了各系统间的数据通信问题。
简单的解决方案是所有的系统公用一套数据库(单库或者集群,总之数据库结构一样)。这样的好处是实现简单,可靠;坏处是没有关注平台间的差异性,不同平台需要的数据结构不一样,通用的数据库容易造成各个平台都达不到最好的性能,比如统计需要对所有数据进行计算;
另外各个系统之间在数据库发生了耦合,在统计和审核过程中产生的垃圾SQL可能影响在线的业务。引入消息中间件作为各个系统间通信大使可以有效的解决上述问题:数据在后端产生后,可以扔到消息中间件,其余需要这个数据的系统订阅改消息中间件就可以拿到这条数据并进行自己业务内的操作。这样做的最大好处是系统隔离,系统之间不互相影响,缩小问题的影响范围,避免问题的连锁反应——把三个容易出问题的系统绑在一起,出问题的概率不止提升了三倍。
消息中间件就像是快递员,把东西给我,告诉我送给谁,你忙你的去吧。
图片.png
消息中间件的作用:
生产者 ---消息中间件---消费者
这个就是著名的生产消费模型
kafka对消息保存时根据Topic进行归类
发送消息者成为Producer,消息接受者成为Consumer
kafka集群有多个kafka实例组成,每个实例(server)称为broker。
图片.png
二.topics/logs
一个Topic可以认为是一类消息
每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。
例如生产环境:
/日志存放路径 /topic名字/00000000000000000000.index
/日志存放路径 /topic名字/ 00000000000000000000.log
“任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制(index)来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”
图片.png
kafka:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支.
对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)
kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.
partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.(具体原理参见下文).
三.Distribution(分布式)
一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性.
基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定.
四.Producers生产者
Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等.
kafka的配置文件中配置了 partition的数量和 replication的数量
图片.png
但是这个是 默认值 创建topic时 不指定partiton数量的话才会使用这个默认值
比如我们拿一个创建topic的例子来说:
例子: (先忽略语法,领会意思)
创建一个叫做“test”的topic,它只有一个分区,一个副本。
bin/kafka-topics.sh --create --zookeeper 10.0.x.x(zk的地址):2181/kafkagroup --replication-factor 1 --partitions 2 --topic test
可以通过list命令查看创建的topic:
bin/kafka-topics.sh --list --zookeeper 10.0.x.x:2181/kafkagroup
Test
负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,有producer客户端决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.
其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件.
异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。
五.Consumers消费者
本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.
如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
在kafka中,一个partition中的消息只会被group中的一个consumer消费;
每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.
Guarantees
- 发送到partitions中的消息将会按照它接收的顺序追加到日志中
- 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.
- 如果Topic的"replicationfactor"为N,那么允许N-1个kafka实例失效.
六. Zookeeper对kafka的作用
1.zookeeper 作用必须掌握的
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
1) Broker node registry: 当一个kafkabroker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.
格式: /broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.
2) Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.
格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引号.
3) Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".
一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.
2.下面zookeeper作用可以自己研究
供参考
4) Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
格式:/consumers/[group_id]/ids/[consumer_id]
仍然是一个临时的znode,此节点的值为{"topic_name":#streams...},即表示此consumer目前所消费的topic + partitions列表.
5) Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.
格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value
此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.
6) Partition Owner registry: 用来标记partition被哪个consumer消费.临时znode
格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id当consumer启动时,所触发的操作:
A) 首先进行"Consumer id Registry";
B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
3.总结性说明
总结
1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partitionleader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.
在其中我们知道如果要kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的,以下是对zookeeper进行一些简单的介绍:
七.Zookeeper简介
zookeeper翻译成英文叫动物园管理员 动物员管理员的作用是什么呢??
1.让大象(hadoop),蜂巢(hive) ,猪(pig)能够更友好的在一起,以上几种都是hadoop的组件
2.ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务
3.zookeeper其实就是一个软件,所有安装了zookeeper的服务器都叫 zookeeper server
4.zookeeper server 还分为两类角色,由 leader 和 follower 组成,如果leader挂掉,会有选举机制,follower直接替换leader ,leader只有一个,剩下的都是follower
5.zookeeper 的所有服务器中的所有数据结构(树形结构)是完全相同的,
就是说我搭建一个zookeeper集群,集群里面所有机器的数据是一样的
数据是树形结构的,与linux目录结构是一样一样的,zk的每个数据目录就是一个znode
待安装完成之后,进入到zk的bin目录下有一个 zkCli.sh(这是zk自带的客户端)
[zk: localhost:2181(CONNECTED) 0] ls /
下面显示出来的每个目录就是一个znode
6.我需要运行几个ZooKeeper? 你运行一个zookeeper也是可以的,但是在生产环境中,你最好部署3,5,7个节点。部署的越多,可靠性就越高,当然最好是部署奇数个,偶数个不是不可以的,但是zookeeper集群是以宕机个数过半才会让整个集群宕机的,所以奇数个集群更佳
Ok。。。多说无意,我们来实战一下,安装5台zookeeper集群
网友评论