Apache Kafka
Kafka是专为分布式高吞吐系统设计的生产-消费消息中间件,可以处理大量的数据,适合离线和在线消息消费。Kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。Kafka构建在ZooKeeper同步服务之上。它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。
优点
- 可靠性: Kafka是分布式,分区,复制和容错的
- 可扩展性:Kafka消息传递系统轻松缩放,无需停机
- 耐用性:Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的
- 性能:Kafka对于生产和消费消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。Kafka非常快,并保证零停机和零数据丢失。
组件
- Topic:主题,用于处理不同分类的消息的特定类别的消息流
- Partion:分区,主题下有多个分区,消息被分别放在不同分区,不同消费者就可以消费不同分区中的消息,从而完成并行处理,越多的分区意味着可以容纳更多的消费者,从而提升并发消费的能力
- Offset:分区偏移,每个分区消息都会被分配一个唯一序列标识
- Consumer Group:消费者组,同一主题中不同分区的消息可以交给消费者组下面不同的消费者消费,一个消费者可以消费多个分区的消息
- Broker:代理,Kafka集群的一台或多台服务器
- Cluster:集群,多个代理构成Kafka集群
- Producer:生产者,可以向一个或多个Kafka主题生产消息
- Consumer:消费者,通过消费一个或多个主题,从代理中提取生产的消息
集群架构
- Kafka是基于文件存储的,当生产者发布消息后,可以将消息以日志的形式持久化。Kafka集群由多个代理组成以保持负载均衡,Kafka代理的无状态的,需要使用ZooKeeper来维护集群状态
- 分区的存在使得同一主题下的消息可以分开持久化,将一个主题切分成任意数量的分区可以避免文件尺寸达到单机磁盘上限,并提升消息保存/消费的效率
- 分区中的每条消息包含三个属性:Offset(消息偏移量)、MessageSizize(消息长度)、Data(消息内容)
- 生产者可以异步发送消息,先将消息存在内存中,然后一次请求批量发送出去。生产者可以指定发送的消息归属某个主题下的指定分区。
- 由于Kafka代理是无状态的,消费者需要通过分区偏移来维护自己已经消费消息的数量。消费者向代理发出异步拉取请求,以具有准备好消耗字节的缓冲区。消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。
- 当Broker Leader宕机后,ZooKeeper会帮助集群重新选举Leader,并通知消费者进行调整。消费者消费消息的偏移值也由ZooKeeper保存。
- 无状态导致消息难以删除,Kafka采用基于时间的SLA,消息保存一定时间(通常为7天)后会被删除
Kafka的持久化
Kafka将数据追加到文件中实现持久化,该操作不会阻塞写操作和其它操作,由于分区的存在,数据大小也不影响性能,相对内存来说,在硬盘空间建立消息系统的容量限制更宽泛,读操作时线性访问磁盘,很多时候速度比随机地访问内存更快。
消息查找
消费者可以通过消息的偏移量区查询某个分区下的指定消息。Kafka为数据文件建立了稀疏索引,通过这些索引进行二分查找。
数据传输事务
- at most once:最多一次,消费者拿到消息后先记下Offset,再进行消费,当消费产生异常,不会重新消费。
- at least once:最少一次,消费者拿到消息后先消费,消费成功后再记下Offset,保证了每条消息在消费异常后能被重新消费,通常这种方式是首选,因为重复接收数据总比丢失数据要好
安装ZooKeeper
(Zookeeper镜像下载地址)[https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/]
# 下载ZooKeeper
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
# 解压
tar -zxvf zookeeper-3.4.12.tar.gz
# 将ZooKeeper移动到指定文件夹
mv zookeeper-3.4.12 /usr/local/zookeeper
cd /usr/local/zookeeper
# 创建data文件夹
mkdir data
# 更改配置文件dataDir属性配置为/usr/local/zookeeper/data
cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
nano ./conf/zoo.cfg
# 启动ZooKeeper服务端
./bin/zkServer.sh start
# 启动客户端
./bin/zkCli.sh
# 退出客户端
quit
# 关闭ZooKeeper
./bin/zkServer.sh stop
安装Apache Kafka
(Kafka镜像下载地址)[https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/]
# 下载Kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz
# 解压
tar -zxvf kafka_2.12-1.1.0.tgz
# 将Kafka移动到指定文件夹
mv kafka_2.12-1.1.0 /usr/local/kafka
cd /usr/local/kafka
# 修改Kafka内存配置和日志地址(export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M")
nano ./bin/kafka-server-start.sh
nano ./config/server.properties
# 启动Kafka
./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
Kafka操作命令
# 创建副本为1、分区为1、名为test的主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看主题列表
./bin/kafka-topics.sh --list --zookeeper localhost:2181
# 查看主题详情
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
# 删除主题
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
# 使用运行Java API的方式删除主题
./bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --zookeeper localhost:2181 --topic test
# 启动生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 启动消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
网友评论