安装
- 官网下载安装包
zookeeper
kafka
- zookeeper安装
解压修改配置项
vi zk/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/apps/tmp/zk/zkdata
dataLogDir=/root/apps/tmp/zk/zkdatalog
clientPort=2181
server.1=master:2888:3888
server.2=slave01:2888:3888
server.3=slave02:2888:3888
发送zookeeper文件夹到各个集群节点
每个集群节点的dataDir目录下创建myid文件,
内容和上面的server.id对应
启动
zk/bin/zkServer.sh start
- kafka安装
解压修改配置项
vim kafka/config/server.properties
broker.id=0 //集群各个节点的id要不等且唯一
host.name=master
log.dirs=/root/apps/tmp/kafka-logs
zookeeper.connect=master:2181,slave01:2181,slave02:2181
发送kafka文件夹到各个集群节点
关闭防火墙
service iptables stop
chkconfig iptables off
测试:
bin目录下启动
./kafka-server-start.sh -daemon ../config/server.properties
bin目录下查看topic,
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
创建topic
./kafka-topics.sh --create --zookeeper master:12181 --replication-factor 2 --partitions 1 --topic test_topic
发布消息到topic
./kafka-console-producer.sh --broker-list H30:9092 --topic test_topic
消费消息
./kafka-console-consumer.sh --zookeeper localhost:12181 --topic test_topic --from-beginning
概念
- 高吞吐量,分布式,发布订阅消息系统;
- 具有高性能,持久化,副本备份,横向扩展能力;
- 起到解耦,削峰,异步处理作用。
基本组成
- producer
- consumer
- topic
- broker
- message
特点
- 发布订阅模型
消息模型可以分为两种:队列和发布-订阅式。
队列:处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。
发布-订阅模型:消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。
Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组(consumer group)。
假如所有的消费者都在一个组中,那么这就变成了queue模型。
假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
更通用的, 可以创建一些消费者组作为逻辑上的订阅者。每个组包含数目不等的消费者,一个组内多个消费者可以用来扩展性能和容错。
- 分区
创建一个topic时,同时可以指定分区数目,分区是针对topic不是message;
kafka在接收到生产者发送的消息之后,会根据不同策略将消息存储到不同的分区中。
策略:随机策略,负载均衡,自定义指定分区
好处:
并发读写,速度快
分布式存储,利于扩展
加快数据恢复,某台机器挂掉,每个topic仅需恢复一部分数据
坏处:
分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性;
分区结构:
1个分区分为多个segment;
每个segment包含两个文件log和index;
log是一个segment的消息元数据
index是log的稀疏分布索引
每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。
分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。类似主键ID。
一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。
在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。
topic配置的清理策略是compact。总是保留最新的key,其余删掉。
一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。
- 数据可靠性和一致性
Kafka集群保持所有的消息,无论消息是否被消费了,直到它们过期,。
Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。server.properties配置文件配置
Partition的多个replica中一个为Leader,其余为follower
Producer只与Leader交互,把数据写入到Leader中
Followers从Leader中拉取数据进行数据同步
Consumer只从Leader拉取数据
ISR:所有不落后的replica集合, 不落后有两层含义:距离上次FetchRequest的时间不大于某一个值或落后的消息数不大于某一个值,Leader失败后会从ISR中选取一个Follower做Leader
数据可靠性保证
当Producer向Leader发送数据时,可以通过acks参数设置数据可靠性的级别
0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
-1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证
仅设置acks=-1也不能保证数据不丢失,当Isr列表中只有Leader时,同样有可能造成数据丢失。要保证数据不丢除了设置acks=-1, 还要保证ISR的大小大于等于2,具体参数设置:
request.required.acks:设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica
数据一致性保证
一致性定义:若某条消息对client可见,那么即使Leader挂了,在新Leader上数据依然可以被读到.
HW-HighWaterMark: client可以从Leader读到的最大msg offset,即对外可见的最大offset, HW=max(replica.offset)
对于Leader新收到的msg,client不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被client消费,这样就保证了如果Leader fail,该消息仍然可以从新选举的Leader中获取。
对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)
网友评论