kafka 是一个分布式消息中间件,支持多分区,多副本,多订阅者的,基于zookeeper协调的分布式消息系统
特点
1 . 高吞吐量,低延迟: 每秒处理几十万数据,延迟最低只有几毫秒
2 . 可扩展性:kafka集群支持热扩展
3 . 持久性,可靠性: 消息被持久化道本地磁盘,并且支持数据备份
4 .容错性: 允许集群中节点失败(若副本数位n,允许n-1个节点失败)
5 . 高并发: 支持数千个客户端同时读写
主要应用于大数据实时处理领域
1 架构
基本架构-
producer: 消息生产者,向kafka集群发送消息的客户端
-
consumer: 消息消费者,向kafka集群取消息的客户端
-
consumer group:消费者组,由多个消费者组成。 消费者组内的每个消费者消费不同partiton的数据,一个partition只能由一个组内的消费者消费,消费者组之间互不影响 消费者组在逻辑上就是i一个订阅者
-
broker: 一台kafka服务器就是一个broker。 一个集群由多个broker组成, 一个broker可以容纳多个topic
-
topic: Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
-
partition: 一个topic可以分不到多个broker上, 此时每个broker 上的topic 就是对应的一个partition(分区), 每个partition是一个有序的队列
-
replica: 副本, 为了防止集群中的某个broker故障,导致对应的partition的数据丢失 而提供的副本机制,一个topic 每一个partition可以有多个副本,即一个leader 对应若干个follower
-
leader : 每个partition 的副本对应的主副本,生产者和消费者通信的对象都是leader
-
follower: 每个partition 的副本对应的从副本, 实时同步leader数据,当leader发生故障时,某个follower就会成为新的leader
-
offset: 偏移量, 每条消息都有自己的偏移量,是消息数据在对应partition中的唯一标识, 也是该消息的索引号。每个consumer都会保存自己消费到的offset+1,consumer 的消费的offset 保存在broker集群中专属的topic中(_consumer_offsets); (0.10.x 版本之前保存在zookeeper中), 在kafka 中提交的offset 都是指的下一条待消费的数据, 即已消费的offset+1
-
message : 消息, 简单来说kafka 中的每个message 由一对key-value 组成, 消息结构如下:
message
2. 生产者(producer)
关键参数:
参数名 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker地址清单,可以是一个或者多个,用逗号隔开 |
key.serializer和value.serializer | 指定key和value的序列化类型 |
buffer.memory | RecordAccumlator 缓冲区的总大小,默认32M |
batch.size | 缓冲区一批次数据的最大值,默认16k,适当增加该值,可以提高吞吐量 |
linger.ms | 如果数据未达到batch.size, 在设置的linger.ms 设置的等待时间到来后,就会发送数据,默认是0ms, 表示没有延迟,一般设置为5~10ms |
ack | broker 接收生产数据后的应答机制。 0: 生产者只管发送,不等待broker的应答;1: 生产者发送完数据,只等待leader 节点的应答;-1(all): 生产者发送完数据,等待leader节点和ISR队列中的所有节点同步完后应答 |
max.in.flight.requests.per.connection | 指定了生产者在收到服务器响应之前可以发送多少个消息(发送请求的缓存,一个batch 一个request)。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。默认是5 |
retries | 消息发送失败后的重发次数,默认是int的最大值 2147483647 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms |
enable.idempotence | 是否开启幂等性,默认是true |
compression.type | 生产者发送数据的压缩方式,默认是none,不压缩,支持的压缩方式:gzip,snappy,lz4,zstd |
2.1 发送流程
流程在消息发送过程中,涉及到了两个线程--main线程和sender线程,在main线程中流程如下:
1 . 将消息数据发送给Interceptors 预处理( 可选),然后通过Serializer 进行序列化处理
2 . 序列化之后的数据通过分区选择器, 将消息发送给对应的双端队列(RecordAccumulator, 默认是32M)
sender 流程如下:
1 . 从队列(RecordAccumulator)中拉取消息, 有两种拉取策略
- .batch.size:只有队列中积累的数据量达到batch.size大小后,sender才会发送数据,默认是16K
- . linger.ms:如果数据没达到数据量batch.size, sender会根据linger.ms设置的时间发送数据,默认是0ms, 即无延迟发送
2 . 当拉取到数据后,就会将数据通过selector 发送给对应分区的leder 副本的broker
3 . broker接收到数据后,通过发送ack应答,表明数据发送成功。 ack 应答机制有如下的三种:
- .0: 生产者只管发送,不等待broker的应答
- .1: 生产者发送完数据,只等待leader 节点的应答
- .-1(all): 生产者发送完数据,等待leader节点和ISR队列中的所有节点同步完后应答
4 . 如果生产者没收到ack应答,就将重试发送
2.2 partition 分区
2.2.1 分区的作用
- .便于合理使用存储资源,每个partition在一个broker上存储,切割海量数据到不同的的broker上, 合理的控制分区任务,可以实现负载均衡
- . 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费
2.2.2 分区策略
- 默认的分区策略:
1 . 如果生产者指定了发送的分区,则按照指定的分区发送
2 .如果没指定分区,则按照消息中的key的hash值对设置的分区数取模
3 . 如果分区和key 都没指定,则选择粘性分区(sticky partition,随机选择分区并不变),直到对应的recordAccumulator的batch.size满了或者linger.ms 时间到了,再随机选择其他分区
- 自定义分区器:
2.3 数据可靠保证
2.3.1 Ack应答机制
Ack 的应答机制确保数据不会丢失,Ack 应答机制有如下的三种, 可靠程度依次提高
0 : 生产者只管发送,不等待broker的应答(不推荐)
缺点: 无法保证broker 是否收到了数据
1 : 生产者发送完数据,只等待leader 节点的应答
缺点: 有可能leader 节点ack 应答后,还没同步就挂了,导致后续新选举的leader 节点丢失了该条消息
-1(all): 生产者发送完数据,等待leader节点和ISR队列中的所有follower节点同步完后应答
ISR队列(in-sync replica set): 和leader 保持同步的follower和leader节点的集合(leader:1,isr:1,2,3)
例如: 主题为test的topic 创建了一个分区,总共有两个副本,其中leader 副本对应的broker id=2, follower副本对应的broker id=1,此时isr队列中的集合就是(2,1)
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe
Topic: test TopicId: MPY45pufQ_m-bW7h_cSULQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
如果follewer 节点长时间没有发送通信请求或者同步数据,则将被踢出ISR 队列, 该时间阈值由参数replica.lag.time.max.ms 设定,默认是30s
总结 三种应答机制的可靠性依次提高,但是在特殊情况下,-1的应答机制也没办法完全保证数据不丢失。例如: 分区副本只有一个,或者isr 中的节点数只有一个, 此时相当于ack=1, 仍然有丢失数据的风险
确保数据不丢失的条件: ack=-1 + 分区副本>1 + isr队列中节点数>1
2.3.2 数据幂等性
procuder 不论向broker 发送多少次重复的数据, Broker 端都只会持久化一条数据,保证数据不重复(去重)
去重标准:具有<PID,Partition,SeqNumber> 相同主键的消息提交时,Broker只会持持久化一条,其中PID 是每次producer分配的一个新的id,Patition 分区号,Sequence Number 单调自增
幂等性幂等性只能保证在单分区单会话内的不重复
幂等性的开启通过参数:enable.idempoyence= true 设置
2.3.3 生产者事务
开启事务必须要开启幂等性
######## 待补充################
2.3.4 数据有序
-
在kafka 1.x版本之前 ,为了保证数据的单分区有序,条件如下:max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
-
kafka 1.x及以后版本为了保证单分区有序,条件如下:
- 未开启幂等性
max.in.flight.requests.per.connection=1 - 开启幂等性
max.in.flight.requests.per.connection<5 即可
原因说明, 在kafka1.x以后,kafka broker 端会缓存producer 发送的最近5个request 的元数据, 同时在幂等性的前提下, 通过Sequence Number对请求进行排序,此时就保证了最近5个request的数据有序
2.3.3 数据传递语义:
- 至少一次(At Least Once):
保证数据不丢失,但是不能保证数据不重复
ack=-1 + 分区副本>1 + isr队列中节点数>1
- 最多一次 (At Most Once):
保证数据不重复,但是不能保证数据不丢失
幂等性
- 精确一次 (Exactly Once)
确保数据不丢失,也不重复
幂等性 + ack=-1 + 分区副本>1 + isr队列中节点数>1
4 Broker
重要参数:
参数名 | 描述 |
---|---|
replica.lag.time.max.ms | ISR中,follower 由于长时间未与leader 通信而导致被踢出isr的时间阈值,默认30s |
auto.leader.rebalance.enable | 自动leader partiiton平衡,默认是trrue,建议关闭 |
leader.imbalance.per.broker.percentage | 默认是10%,每个broker 允许的不平衡的leader的比例,超过这个值,会触发leader自动平衡 |
leader.imbalance.check.interval.seconds | 默认300s,检查leader负载平衡的时间间隔 |
log.segment.bytes | kafka中切割为每一块数据文件的大小,默认1g |
log.index.interval,bytes | 默认4k,每当写入4kb大小的数据后,就往index文件记录索引(稀疏索引的数据大小阈值) |
log.retention.hours | kafka 数据保存时间。默认7天 |
log.retention.minutes | kafka 数据保存时间,分钟级别,默认关闭 |
log.retention.ms | kafka 数据保存时间,毫秒级别,默认关闭 |
log.retention.check.interval.ms | 检查数据是否超过保存时间的间隔,默认5min |
log.rentention.bytes | 默认-1,表示无穷大,超时设置的所有日志的总大小后,删除最早的segment |
log.cleanup.policy | 数据文件删除策略,默认是delete,如果是compact,表示启用压缩 |
num.io.threads | 默认8,负责写磁盘的线程数,参数值占总核数的50% |
num.replica.fetchers | 默认1, 副本拉取线程数,参数值占总核数的1/3 |
num.network.threads | 默认3,数据传输线程数,参数值占总核数的2/3 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是log的最大值,一般不建议修改,系统自己管理 |
log.flush.interva.ms | 没隔多久刷写数据到磁盘,默认是null,不建议修改 |
4.1 工作流程
4.1.1 zookeeper 存储的kafka信息
启动zookeeper 终端客户端:
[root@iZuf6g3hri8hvnuqng6id7Z apache-zookeeper-3.5.7-bin]# ./bin/zkCli.sh
通过ls 命令 查看kafka 相关信息
[zk: localhost:2181(CONNECTED) 0] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
zk 信息
4.2分区副本
4.2.1 副本基本信息
- 副本作用: 提高数据的可靠性
- 默认一个副本,生产环境一般配置为2 个,保证数据可靠性, 副本太多会增加磁盘存储空间,增加网络数据传输,降低效率
- kafka 中副本分为: leader和follower, 生产者将数据发送给leader, follower 和leader 进行同步数据
- kafka 分区所有的副本统称为AR(Assigned Replicas)
AR= ISR + OSR
ISR: 表示和leader 保持同步的Follower集合,如果follower长时间没有和leader 通信或者同步数据,则该follower将被踢出ISR, 该时间戳由: replica.lag.time.max.ms参数设定,默认30s。 leader发生故障后,会从ISR中选举新的leader
OSR: 表示和follower 与leader同步时,延时过多的副本集合
4.2.2 副本leader选举流程
kafka 集群中每个broker都有对应的controller, 其中有一个controller 会被选举为controller leader( 通过查看zk的/controller 节点,可以知道选举出来的leader 节点), 负责管理集群broker的上下线和 所有topic的分区副本leader选举和分配
controller的信息同步依赖于zookeeper
- broker 启动后就会在zookeeper中注册
- controller leader 的选举机制是,先注册先成为,同时由选举出来的leader 通过zookeeper 去监听所有broker节点变化
- controller leader同时负责topic的分区副本leader的选择,选举策略如下:
在ISR中的存活为前提,按照AR中排在前的为优先,例如:AR[1,0,2], ISR[1,0,2],那么leader 就会按照[1,0,2]的顺序轮训
- controller leader 将选举的信息上传到zk 中
-
其他broker 的controller 去zk 中同步呢信息
broker运行流程
测试:
kafka 集群集群中有三个broker ,id分别为1,2,3, 此时创建主题名为test1, 分区数3,副本数3的topic,查看此时topic的详细信息如下:
test1分区0,1,2的leader 节点id为2,3,1
AR 分别为[2,3,4],[3,1,2],[1,2,3], ISR分别为[2,3,1],[3,1,2],[1,2,3]
[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test1 --partitions 3 --replication-factor 3
Created topic test1.
# 查看topic详细信息
[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1
Topic: test1 TopicId: pfPkdWsKTyCCsoektoTayQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test1 Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test1 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
下线节点broker 3,此时分区1 的leader 节点挂了,需要重新选举leader,安装选举规则,应该选举 brokerid为1 的节点作为leader
[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1
Topic: test1 TopicId: pfPkdWsKTyCCsoektoTayQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: test1 Partition: 1 Leader: 1 Replicas: 3,1,2 Isr: 1,2
Topic: test1 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2
4.2.3 broker 节点故障处理
概念理解:
LEO(Log End Offset):每个腹部的最后一个offset,其实就是最新的offset +1
HW (High WaterMark): 所有副本中最小的LEO
1 . topic 分区的follower 对应的一个节点挂了
2 . topic 分区的leader 节点挂了
4.3 文件存储
4.3.1 文件存储机制
每个topic 对应的每个分区(partition)都对应一个log 文件,该log文件中的存储的就是kafka的生产的数据,producer生产的数据会不断的追加到文件末尾, 为了防止log 文件过大,导致数据查询效率低下,kafka采用分片和索引机制。
每个partition分为多个segment,每个segment 文件包括:".index"文件,".log"文件, ".timeindex"等文件, 这些文件位于一个文件夹下,文件夹命名规则:topic名+分区序号
例如 : 查看test-0 分区的数据文件内容
[root@iZuf6g3hri8hvnuqng6id7Z test-0]# ls
00000000000000000000.index 00000000000000000000.timeindex leader-epoch-checkpoint
00000000000000000000.log 00000000000000000001.snapshot partition.metadata
log-segment
数据查询定位
4.3.2 文件删除机制
kafka 中的默认数据保存时间为7天, 可以通过如下参数修改:
- log.retention.hours : 最低优先级,默认7天
- log.retention.minutes: 优先级次之 ,分钟
- log.retention.ms: 最高优先级,毫秒
- log.retention.check.interval.ms : 负责设置检查周期,默认5min
kafka 中的数据文件清理策略有 delete和compact
- delete: 将过期数据删除
设置参数: log.cleanup.policy=delete
基于时间:默认打开,以segment 中所有记录中的最大时间戳作为该文件的时间戳, 即当一个segment中的部分数据超期了, 此时会等待该segment所有数据超期后,再删除
基于数据大小:默认关闭,超过设置的所有日志的总大小,删除最早的segment。log.retention.byte=-1 (表示无穷大, 即关闭基于数据大小) - compact: 日志压缩
对于相同key的不同value值, 只保留最后一个版本
设置参数:log.cleanup.policy=comapct
数据压缩
4.4 高效读写数据
1 . kafka本身是分布式的集群,采用分区策略,并行度高
2 . 数据存储采用稀疏索引,可以快速定位要消费的数据
3 . 顺序写磁盘: kafka 的数据写入过程是文件末尾追加的方式
- 页缓存 + 零拷贝的方式
5 消费者(consumer)
kafka 中的消费者采用pull(拉)模式
重要参数
参数名 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker地址清单,可以是一个或者多个,用逗号隔开 |
key.deserializer和value.deserializer | 指定key和value的反序列化类型 |
gruop.id | 消费者所属的消费者组id |
enable.auto.commit | 默认为true,自动周期性的提交offset |
auto.commit.interval.ms | 自动提交offset的时间间隔 |
auto.offset.reset | 初始化时偏移量的设置策略,默认latest。1. earliest:自动重置偏移量到最早的偏移量;2. latest:自动重置偏移量到最新的; 3. none:如果原来的消费者组偏移量捕存在,则向消费者抛出异常 |
offsets.topic.num.partitions | _consumer_offsets的分区数,默认50 ,不建议修改 |
heartbeat.interval.ms | kafka 消费者和coordinator之间的心跳时间,默认3s |
session.timeout.ms | kafka和coodinator之间的连接超时时间,默认45s,超过该时间,该消费者被移除消费者组,消费者组会进行再平衡 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认5min,超过该时间,该消费者被移除消费者组,消费者组会进行再平衡 |
fetch.mxax.bytes | 默认50M,消费者向broker一次拉取的最大字节数,如果服务器端一批次的数据大于该值,仍可以拉取数据,一批次的大小受到message.max.bytes(broker config)和 max.message.bytes(topic config)影响 |
max.poll.records | 一次拉取数据的最大条数,默认500 |
partition.assignment.strategy | 消费者分区分配策略,包括了 range,roundRobin,sticky,cooperativeSticky |
5.1 总体消费流程
1 . 消费者按照partition 的offest 按顺序依次读取里面的数据
2 . 一个消费者可以消费多个主题的多个分区的数据
3 . 在消费者组中, 每个分区只能由该消费者组中的一个消费者消费,防止重复消费
4 . 每个消费者的消费offset 由消费者提交到系统的主题中保存
消费流程
5.2 消费者组
Consumer Group :由多个consumer 组成, 组内成员拥有共同的groupId
- 消费者组内的每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费
- 消费者组之间互不影响
- 如果消费者组中的消费者数量多于主题分区的数量,则会有一部分的消费者处于闲置状态,不消费任何数据
5.2.1 消费者组初始化流程
概念:
coordinator: broker 中的组件,负责辅助实现消费者组的初始化和分区的分配
coordinator节点选择= groupid的hashcode %50
注:50 指的是系统主题_consumer_offsets的分区数量
例如: groupId 的hashcode=1,1%50=1, 此时间_consumer_offsets主题的1号分区所在的broker 节点的coordinator就是这个消费者组的老大,此时初始化流程如下:
- 消费者的每个消费者发送joinGroup请求给coordinator
- coordinator选出一个consumer 作为leader
- coordinator把要消费的topic情况发送给leader消费者
- leader 消费者指定消费方案并发送给coordinator
- coordinator再分发消费方案给各个消费者
- 每个消费者和coordinator 保持心跳(默认3s), 一旦超时(session.timeout.ms=45s),该消费者就会从消费者组中移除,并触发再平衡;或者消费者的处理消息时间过长(max.poll.interval.ms=5min),也会触发再平衡
kafka 有专门的分区策略来支持在消费者组中对消费分区的分配
主要策略有 Range,RoundRobin,Sticky,CooperativeSticky(3.0 版本新增), 通过配置参数: partition.assignment.strategy来设置,默认是Range 策略
-
Range 策略
Range 策略是针对于每个topic 而言的
1 首先对同一个topic 中的分区按照序号排序,并对消费者组中的消费者按照client.id字典排序
2 通过 partition 数/ consumer 数 得到商n 和余数m,则每个消费者至少分到n个分区,然后前m 个消费者多分一个分区例如: 现在有7个分区(0,1,2,3,4,5,6), 消费者组中有三个消费者(c0,c1,c2), 7/3=2---1, 那么c0就会多消费一个分。此时c0 消费0,1,2 分区;c1消费3,4分区 ;c2 消费5,6分区。
range 分区
如果8个分区的话,8/3=2----2, c0,c1就会多消费一个
注意: 如果只是针对一个topic,c0多消费一个分区影响不大,但是如果有N个topic, 那么c0就将多消费每一个topic的分区,容易产生数据倾斜
2 . RoundRobin策略
- 将所有订阅的topic和partition 组成topicAndPartition 列表,并按hashcode 进行排序,最后以轮训的方式分配给消费者
3 . Sticky策略
可以理解为分区的分配结果带有粘性,即在执行一次新的分配之前,会考虑上一次的分配结果,尽量减少调整改动,节省开销
Sticky策略是在kafka 0.11.x版本之后引入的,首先会尽量均匀的分配分区,类似图range策略,在消费者组中的某一消费者出现问题时,会尽量其他消费者的原有分区不变
例如 0,1,2,3,4,5,6 分区,被一消费者组的三个消费者c0,c1,c2 消费, 初始化分配时按照均匀分配的原则,将所有分区随机均匀分配给消费者, 假如c0 分配到0,2,3;c1 分配到1,4 ;c2 分配到5,6;此时c0 挂了,触发再分配策略,此时将c0分分区分配给c1,c2, 结果;c1 分配到的分区为1,2,4;c2 分配到0,3,5,6
5.2.2 消费者组详细消费流程
消费流程5.3 offset 偏移量
5.3.1 默认维护位置
从kafka0.9版本之后,consumer默认将消费的offset 存储在kafka 内置的主题_consumer_offsets中
_consumer_offsets里采用了key和value 的形式存储数据,key=group.id+topic+partition.id, value 就是对应的offset,每隔一段时间,kafka 都会对这个内置的topic 进行compact 压缩处理,是的每个key的value值都保留最新的数据
存储位置
5.3.2 自动提交offset
自动提交offset5.3.3 手动提交offset
手动提交offset 首先需要配置参数 enable.auto.commit=false
手动提交
5.3.4 指定offset 消费
根据配置参数可以设定offset 的初始偏移量
auto.offset.reset= earliest | latest | none(默认是latest)
- earlist: 自动将offset 设置为最早的偏移量, --from-beginning
- latest: 自动将偏移量重置为最新的偏移量
- none: 如果未找到消费者组的先前的偏移量,则向消费者抛出异常
5.3.5 漏消费和重复消费
消费者事务6 压力测试
kafka 自带压力测试脚本
- 生产者压测脚本: kafka-producer-perf-test-sh
- 消费者压测脚本: kafka-consumer-pref-test-sh
6.1 生产者压测
例:
- 创建名为test的主题,设置分区3个,副本3个
kafka-topic.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 3 --partitions 3 --topic test
- 开始测试:
kafka-producer-pref-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 100000 --producer-props bootstrap-server=127.0.0.1:9092 batch.size=16384 linger.ms=0
参数说明:
- record-size : 一条消息的信息量大小,单位字节
- num-records: 总共发送的消息的数量
- throughput:每秒发送的消息数量, 设置为-1 表示不进行限制
- producer-props 生产者参数的相关信息
6.2 消费者压测
例:
kafka- consumer-pref-test.sh --bootstrap-server 127.0.0.1:9092 --topic test --messages 1000000 --consumer.config config/consumber.propertier
参数说明
- messages : 总共消费的数据数目
- consumer.config: 消费者的配置文件,可以修改配置文件中的参数来做压力测试,查看不同参数对压测的影响
网友评论