问题
某一天业务来找我,反映发数据到某一个Kafka集群特别慢。
并且他们提供了一份自己的测试结果,结果显示发送数据到Kafka集群A的平均响应延迟在10ms以内,但是发送到Kafka集群B的平均响应延迟却达到了2000ms+。
这种问题一般是比较头疼的,首先,我们Kafka集群都有监控和报警,通过查看可用性、流量变化、Kafka日志等方式,都没有发现任何异样;其次,响应慢也有可能和用户的使用方式和测试方法有关系。
因此第一步,我决定验证一下问题的存在。
验证问题
在kafka/bin
目录中,kafka提供了一个写请求性能测试脚本kafka-producer-perf-test.sh
。
这个脚本会运行kafka中的kafka.perf.ProducerPerformance
类,发送消息到kafka并输出CSV报告。
测试命令如下:
kafka/bin/kafka-producer-perf-test.sh --broker-list ${BROKER_LIST} --topics perf-test-topic --show-detailed-stats --messages 10000 --csv-reporter-enabled --metrics-dir ./perf-report
通过分析生成的报告,发现确实有一台节点的响应比较慢:
time | min | max | mean | median | stddev | 95% | 99% | 99.90% |
---|---|---|---|---|---|---|---|---|
1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
2 | 1184.369398 | 1184.369398 | 1184.369398 | 1184.369398 | 0 | 1184.369398 | 1184.369398 | 1184.369398 |
3 | 1184.369398 | 1308.03076 | 1246.200079 | 1246.200079 | 87.44178764 | 1308.03076 | 1308.03076 | 1308.03076 |
4 | 1036.153496 | 1308.03076 | 1176.184551 | 1184.369398 | 136.1233097 | 1308.03076 | 1308.03076 | 1308.03076 |
5 | 1036.153496 | 1308.03076 | 1176.184551 | 1184.369398 | 136.1233097 | 1308.03076 | 1308.03076 | 1308.03076 |
6 | 1036.153496 | 1308.03076 | 1170.298591 | 1168.505053 | 111.7658942 | 1308.03076 | 1308.03076 | 1308.03076 |
7 | 1036.153496 | 1308.03076 | 1195.533735 | 1184.369398 | 112.0391625 | 1308.03076 | 1308.03076 | 1308.03076 |
8 | 1036.153496 | 1308.03076 | 1176.72978 | 1168.505053 | 110.2893991 | 1308.03076 | 1308.03076 | 1308.03076 |
可以看到P999分布已经达到了1300ms左右,这显然是不正常的,但是原因是什么呢?
分析
既然日志没有问题,那只能看一下jstack信息了:
"kafka-request-handler-12" daemon prio=10 tid=0x00007fee9c7eb800 nid=0xea5a waiting for monitor entry [0x00007fecfbaf9000]
java.lang.Thread.State: BLOCKED (on object monitor)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
- waiting to lock <0x0000000640327150> (a java.lang.Object)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:662)
"kafka-request-handler-11" daemon prio=10 tid=0x00007fee9c7e9000 nid=0xea59 waiting for monitor entry [0x00007fecfbbfa000]
java.lang.Thread.State: BLOCKED (on object monitor)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
- waiting to lock <0x0000000640327150> (a java.lang.Object)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:662)
如上发现jstack中有非常奇怪的信息,很多kafka-request-handler线程都处于阻塞状态。
这里简单解释一下kafka的处理请求线程模型,引用一篇讲Kafka NIO网络通信的文章中的图来说明:

如图,kafka采用了Java NIO中的selector模型。一个Acceptor线程负责接受请求,多个Processor线程负责处理请求。但实际上Processor线程只是把请求封装成kafka request,然后丢到RequestChannel中(当然也负责读取response并返回,这里不展开)。真正执行请求的是KafkaRequestHandler,即jstack中的kafka-request-handler线程。
所以当kafka-request-handler线程出现大量阻塞的时候,会极大地影响整个节点的响应效率。
关于Java线程中的BLOCKED状态,可以直接看一下Java doc说明:
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
可见kafka-request-handler线程是因为抢锁而发生了阻塞。我们根据jstack信息中的kafka.cluster.Partition.appendMessagesToLeader
定位到了源码:
def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
leaderIsrUpdateLock synchronized {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val info = log.append(messages, assignOffsets = true)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
case None =>
throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}
}
可以看到这个方法确实是同步的,同步对象是leaderIsrUpdateLock。由于leaderIsrUpdateLock是kafka.cluster.Partition
的成员变量,也就是说只有在写同一个topic partition的时候,才会发生互斥等待。
所以发生上面问题的原因只可能是某个topic有大量的写请求,而且这个topic的partition数量不多,导致并发不足。
于是大量该topic的ProduceRequest占用了kafka-request-handler线程池,但是这些线程之间互相抢锁,执行效率比较低,从而导致其他topic的请求无法及时被处理。
解决
通过分析日志和查看监控流量,定位到集群中某个topic的ProduceRequest请求的QPS占了整个集群的80%以上。
通过查看该topic监控指标中的单位时间内的消息条目数(MessagesInPerSec)和单位时间内的发送请求数(ProduceRequestPerSec),可以计算出该Topic平均不到10条消息就会触发一次kafka写请求;再考虑到partition数量,推测应该是业务采用了kafka producer的同步模式,每条消息都触发一次kafka写请求。
解决方法有两种:
- 通过在kafka producer config中配置
producer.type=async
来使用异步发送模式。该模式下client会把消息先放到一个queue中,后台的发送线程会从queue中取出消息,以batch(默认200条)的方式发送到kafka。这种方式提高了吞吐,妥协了时效性(可以配置最大发送间隔,默认5000ms),适合数据量比较大,对延迟不敏感的业务。 - 依旧采用默认的同步方式,不过client需要把要发送的消息先缓存到buffer中,然后调用send接口。其实send接口的参数是可变参数,接收的是message列表,
def send(messages: KeyedMessage[K, V]*): Unit
;但有一些用户不注意,会把自己集合中的一批消息逐条的调用send,给kafka后端带来QPS压力。
- 错误示例
val messages = Seq("hello", "world")
val properties = new Properties()
// custom properties here.
val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))
messages.foreach{m =>
val keyedMessage = new KeyedMessage[String, String]("topic", null, m)
kafkaProducer.send(keyedMessage)
}
- 正确示例
val messages = Seq("hello", "world")
val properties = new Properties()
// custom properties here.
val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))
val keyedMessages = messages.map(m => new KeyedMessage[String, String]("topic", null, m))
kafkaProducer.send(keyedMessages: _*)
当然,增加topic partition数量也能在一定程度上缓解问题,因为不同partition之间的写请求是不互斥的,但这种方式更像是治标不治本,掩盖了根本问题。
总结
合理地发送网络请求在分布式系统中非常重要,为了提高效率,通常在权衡时效性和吞吐的情况下,以“聚少为多”的方式发送批量的请求。过多的小请求不仅会降低吞吐,还可能会压垮后端的服务。
当然,作为服务提供方,需要通过多租户、限流等方式,避免不正常使用的场景把服务压垮。
网友评论