一、kafka基本介绍
Kafka:分布式流处理系统
1、高吞吐量
2、分布式
3、发布-订阅模式
4、实时流处理
Kafka优点:
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
支持Kafka
Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输
同时支持离线数据处理和实时数据处理
二、kafka基本概念
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。
它能扩大日志在单个服务器里面的大小, 每个分区大小必须适应它从属的服务器的规定的大小, 但是一个topic可以有任意很多个分区, 这样topic就能存储任意大小的数据量。
分区还和并发有关系。
为了使Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
Topic在逻辑上可以被认为是一个queue。每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。
无论发布到Kafka的数据是否有被消费, 都会保留所有已经发布的记录, Kafka使用可配置的数据保存周期策略。
在存储数据上, kafka提供高效的O(1)性能处理算法, 可以长期保存数据。
每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置. 位移位置是被消费者控制, 消费者可以按照任何他喜欢的次序进行消费。
Producer
负责发布消息到Kafka broker
生产者发布消息到他们选择的topic中, 生产者负责选择记录要发布到topic的那个分区中, 这个可以简单通过轮询的方式进行负载均摊, 或者可以通过特定的分区选择函数(基于记录特定键值),。
消费者使用消费组进行标记, 发布到topic里面的每条记录, 至少会被消费组里面一个消费者实例进行消费。
Consumer
消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
每一个consumer实例都属于一个consumer
group,每一条消息只会被同一个consumer
group里的一个consumer实例消费。
允许不同consumer
group同时消费同一条消息
Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
三、支持的客户端类型
如上图:
4个分区
2个消费组
消费组A有2个消费者, 消费组B有4个消费者
producer向broker
push消息并由consumer从broker
pull消息。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
三、与传统消息系统的区别
队列模型: 一群消费者从服务器读取记录, 每条记录会分发到其中一个消费者。
优点:可以把数据处理提交到多个消费者实例中, 适用于数据处理的水平扩展。
缺点:不是多订阅的, 一旦其中的一个消费者读取了记录, 则记录就算处理过了.
发布和订阅模型: 记录分发给所有的消费者。
优点:允许你广播到记录到不同的订阅者上。
缺点:但是这种方式没法对不同的订阅者进行负载均摊。
数据写入kafka时被写入到磁盘, 并复制到其他服务器上进行容错:
策略一:所有从服务器接收消息并存储后才得到写成功的通知, 否则就认为失败。
策略二:一台从服务器接收消息并存储后即认为成功, 否则就认为失败。
策略三:不接受从服务器通知,数据发出即认为成功。
kafka很有效率利用了磁盘结构,无论你存储的是50KB或50TB的数据在kafka上, kafka都会有同样的性能。
kafka的流数据处理器是持续从输入的topic读取连续的数据流, 进行数据处理, 转换, 后产生连续的数据流输出到topic中
四、常用api
1、启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2、启动kafka
bin/kafka-server-start.sh config/server.properties
3、创建topic
bin/kafka-topics.sh --create --zookeeper
localhost:2181 --replication-factor 1
–partitions1 --topic test
4、查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
5、查看topic信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topictest
6、修改topic
bin/kafka-topics.sh--zookeeperzk_host:port/chroot--alter --topic test
--partitions 40
7、删除topic
bin/kafka-topics.sh --zookeeperzk_host:port/chroot--delete --topic test
默认不可删除,须在配置文件中设置delete.topic.enable=true
8、添加配置
bin/kafka-configs.sh --zookeeperzk_host:port/chroot--entity-type topics --entity-
name topic_name --alter --add-config x=y
9、删除配置
bin/kafka-configs.sh --zookeeperzk_host:port/chroot--entity-type topics --entity-
name my_topic_name--alter --delete-config x
10、发布数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
11、消费数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topictest --from-
beginning
12、查看consumer offset
bin/kafka-run-class.shkafka.tools.ConsumerOffsetChecker –zookeeper
localhost:2181 --group test13、查看consumer
group
bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092–list
14、描述consumer group
bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe–group
test-consumer-group
15、connect API
bin/connect-standalone.shconfig/connect-standalone.properties config/connect-file-
source.properties config/connect-file-sink.properties
从文件中读取数据并传输到另一文件
网友评论