美文网首页程序员Apache Kafka
kafka生产者使用不当导致的应用挂起/夯住

kafka生产者使用不当导致的应用挂起/夯住

作者: sheen口开河 | 来源:发表于2018-12-06 20:53 被阅读1次

[TOC]

1. 背景和现象

1.1 kafka版本和部署状态

kafka版本

server和client都是0.11.0

部署状态

kafka多个节点(具体多少不清楚,但是肯定不是单节点),zookeeper3个节点。topic的分区副本数为2。具备高可用。

1.2 事件现象

在一次生产事件中,其中一个kafka节点和zk节点因物理机宕机下线,zk和kafka broker恢复后,生产者应用并没有恢复,最终无法发送消息。

此时生产者端的应用业务流程无法继续执行,流程走到producer模块就被Block住,然后每隔10s报错一次。

重启producer之后,应用恢复。

关键日志

2018-11-07 10:52:24,015 [kfkBolt-tbl_qqhis_sq_trans_flow_raw-thread-0] [com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter:65] [ERROR] - produce:  fail at seco
nd time.   
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1057)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:764)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:609)
        at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter$1.onCompletion(KafkaProducerWriter.java:57)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:760)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
        at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:48)
        at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:27)
        at com.unionpay.cloudatlas.upstorm.component.SimpleBolt.doOnce(SimpleBolt.java:187)
        at com.unionpay.cloudatlas.upstorm.component.SimpleBolt$InnerThread.run(SimpleBolt.java:105)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.

1.3 生产者代码和配置

生产者代码

    @Override
    public DataRecord doOnce(Record record) {
        // TODO Auto-generated method stub
        try {
            OperCounter.getInstance().increment(Constant.KEY_KAFKA_RECEIVE);
            final ProducerRecord<String, Record> proRecord = new ProducerRecord<String, Record>(topicPub, record);
            producer.send(proRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    // TODO Auto-generated method stub
                    // 发送失败
                    if (exception != null) {
                        OperCounter.getInstance().increment(Constant.KEY_KAFKA_REIN);
                        logger.warn("producer send fail and resend.", exception);
                        try {
                            producer.send(proRecord).get();
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            logger.error("produce:  fail at second time.\t", e);
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                        } catch (ExecutionException e) {
                            // TODO Auto-generated catch block
                            logger.error("produce:  fail at second time.\t", e);
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                        } catch (Exception e) {
                            logger.error("produce:  fail at second time.\t", e);
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                        }
                    } else {
                        OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
                    }
                }
            });
            return null;
        } catch (TimeoutException e) {
            logger.error("produce:  fail.\t", e);
            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
        } catch (Exception e) {
            logger.error("produce:  fail.\t", e);
            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
        }
        return null;
    }

生产者配置

bootstrap.servers=${KAFKA_SERVER_IN}
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer
#max time to wait,default to 60s
max.block.ms=10000
batch.size=65536
buffer.memory=134217728
retries=3

通过上述代码和配置可以看出

  • 最大block事件为1000ms,也就是10s
  • buffer配置的较大,为134M
  • 生产者先是异步发送,如果发送失败,则执行一次同步发送

2. 问题初步定位和分析

2.1 kafka生产者简介

排除服务端的疑点

在最终定位之前,我们怀疑过很多点,比如是不是kafka高可用存在bug、是不是zk出问题了、是不是kafka选主失败等,最终通过生产的其他应用现象推论以及理论分析得出以下基本结论

  • kafka本身高可用机制还是比较可靠的,宕机1台节点,server的状态可以快速回复正常
  • zookeeper的高可用也没有问题,3个节点的情况下,是允许1个节点下线的,zookeeper服务正常
  • 宕机期间以及恢复后,kafka完成了leader节点的选举

总的来说,就是不要怀疑服务端有问题。

当然,“不要怀疑服务端有问题”只是我们定位到了原因之后的后置结论,并不表示故障排查的时候忽略服务端的潜在问题,毕竟不管是硬件资源还是软件质量都可能存在缺陷,尤其是开源产品的发展本身就是一个不断迭代完善的过程。

关于kafka生产者

在说原因之前,还需要说明一下kafka的producer流程。kafka生产者发送消息的粗略流程如下:

  • 首先应用调用send发送
  • 消息的KV序列化
  • 根据分区器决定消息发送到那个分区
  • 将消息添加到本地缓冲区,如果缓冲区满,则当前线程block,直到缓冲区有足够的空间或者达到最大阻塞时间(max.block.ms)
  • 有一个独立的IO线程负责从缓冲区中将消息发送到服务端
  • IO线程收到响应之后,通知producer线程完成了发送,如果需要,调用producer指定的回调函数

注意,从上面的流程我们可以看出,在kafka的高版本客户端(貌似是0.9之后)中,发送消息天然的是一个异步的过程,也就是说,消息发送都是异步方式进行的。而我们如果需要使用同步的方式发送消息,那么我们只能通过KafkaProducer.send返回的Future对象完成,调用Future.get,关键代码如下

//KafkaProducer.send的方法签名
//不提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
//提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

当我们调用了Future.get的时候,我们做了什么

上面出问题的代码,使用了同步的方式等待结果,那么同步的get,到底是什么样的操作呢?

先来看下KafkaProducer.send返回的具体Future实现

KafkaProducer.doSend

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;

上面的代码中返回的future即FutureRecordMetadata的实例,其实现了Future的get方法

public final class FutureRecordMetadata implements Future<RecordMetadata> {

    @Override
    public RecordMetadata get() throws InterruptedException, ExecutionException {
        //阻塞等待
        this.result.await();
        if (nextRecordMetadata != null)
            return nextRecordMetadata.get();
        return valueOrError();
    }

    @Override
    public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // Handle overflow.
        long now = System.currentTimeMillis();
        long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
        //阻塞等待
        boolean occurred = this.result.await(timeout, unit);
        if (nextRecordMetadata != null)
            return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        if (!occurred)
            throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
        return valueOrError();
    }
    
}

可以看到,get里通过result.await阻塞等待,再看看这里的result对应的类ProduceRequestResult

public final class ProduceRequestResult {
    private final CountDownLatch latch = new CountDownLatch(1);
    /**
     * Mark this request as complete and unblock any threads waiting on its completion.
     */
    public void done() {
        if (baseOffset == null)
            throw new IllegalStateException("The method `set` must be invoked before this method.");
        this.latch.countDown();
    }

    /**
     * Await the completion of this request
     */
    public void await() throws InterruptedException {
        latch.await();
    }
}

可以看到,其内部的await中调用了CountDownLatch.await进行等待,同时提供了done方法,解除等待的状态。

看到这里就比较清晰了,如果应用通过get方式同步等待结果,其内部实现时使用了CountDownLatch的await方法,当结果返回的时候,IO线程会调用done方法结束等待状态,并且返回结果。我们前面的分析只介绍了如何等待的,至于如何唤醒,将在下文介绍。

2.2 问题定位

通过上面的代码分析,我们几乎可以猜测到问题的出现可能和这里的设计有关——调用了get阻塞等待,但是由于某种原因,导致没有人唤醒等待着的线程。

为了进一步验证我们的想法,在开发环境复现生产事件的情况,当出现上述现象时,通过jstack抓一下线程快照,进一步证实了我们的猜想:

"kafka-producer-network-thread | PRODUCER_VERSION_UP_KAKFA_20181129_195652" daemon prio=10 tid=0x00007fd280253000 nid=0x20a5 waiting on condition [0x00007fd2879d8000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000785982e70> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
        at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
        at com.unionpay.arch.bigdata.test.BigDataUPKafkaProducer$MyCallback.onCompletion(BigDataUPKafkaProducer.java:60)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:599)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:575)
        at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:539)
        at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474)
        at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75)
        at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
        at java.lang.Thread.run(Thread.java:745)

这里是开发环境复现的代码执行情况,因此Producer代码和生产上的不完全一致,但是逻辑相同。

通过线程快照可以发现,IO线程一致处于await状态,因此后续流程无法执行!

关于CountDownLatch的设计和实现,感兴趣的可以查看相关文档。其实这里说生产者应用被Block,严格意义上来说是不对的,线程状态其实是WAITING,因此这里的Block指的是“代码执行不下去,在当前状态一直堵着”的状态~

因此,问题定位如下

  • 在异步发送的的回调里使用了同步的方式再次发送,由于kafka producer的同步发送是阻塞等待,且使用的是不带超时时间的无限期等待(future.get()中未指定超时时间),因此当不被唤醒时会一直wai下去
  • kafka生产者的IO线程(实际执行数据发送的线程)是单线程模型,且回调函数是在IO线程中执行的,因此回调函数的阻塞会直接导致IO线程阻塞,于是生产者缓冲区的数据无法被发送
  • kafka生产者还在不断的被应用调用,因此缓冲区一直累积并增大,当缓冲区满的时候,生产者线程会被阻塞,最大阻塞时间为max.block.time,如果改时间到达之后还是无法将数据塞入缓冲区,则会抛出一个异常,因此日志中看到达到10s之后,打印出异常栈
  • 由于使用了get没有指定超时时间,且该await一直无法被唤醒,因此这种情况会一直持续,在没有人工干预的情况下,永远不会发送成功

生产建议

  • kafka生产者推荐使用异步方式发送,并且提供回调以响应发送成功或者失败
  • 如果需要使用future.get的方式模拟同步发送,则需要在get里加上合适的超时时间,避免因为不可预知的外部因素导致线程无法被唤醒,即使用Future.get(long timeout)的api而不是不带超时参数的Future.get()
  • 不要在异步回调中执行阻塞操作或者耗时比较久的操作,如果有必要可以考虑交给另一个线程(池)去做

3. Future.get为何没有被唤醒

在前面的介绍中,我们定位了问题的原因,但也留下了一些疑问:

  • 为何future.get没有被唤醒?
  • producer是何时执行了回调操作的?
  • 这种情况属于应用使用不当还是kafka的bug?

3.1 HOW:分析思路

想要彻底弄清楚这个问题,恐怕要去好好撸一撸kafka producer的源码了。由于kafka producer的代码非常多,其中有缓冲区操作模块、IO执行模块、元数据更新模块、事务支持模块等很多设计,这里就只从这次的事件问题切入分析,后面如果对kafka producer源码全面分析了之后再专门用几篇文章描述。

那么思路很简单,主要从以下几个方面入手

  • 上一节中我们说到,造成wait的原因就是调用了CountDownLatch的await方法,那么何处调用了CountDownLatch的countdown方法?
  • 在所有调用了CountDownLatch.countdown的地方,是否包含了对kafka节点下线的处理?也就是说,难道kafka节点下线之后,流程就不会走到countdown了吗?

为了弄清楚以上两个问题,我们先去看看源码。通过对ProduceRequestResult的成员变量CountDownLatch latch分析可以知道,修改其状态的方法只有2个await方法和一个done方法

    /**
     * Mark this request as complete and unblock any threads waiting on its completion.
     */
    public void done() {
        if (baseOffset == null)
            throw new IllegalStateException("The method `set` must be invoked before this method.");
        this.latch.countDown();
    }

    /**
     * Await the completion of this request
     */
    public void await() throws InterruptedException {
        latch.await();
    }

    /**
     * Await the completion of this request (up to the given time interval)
     * @param timeout The maximum time to wait
     * @param unit The unit for the max time
     * @return true if the request completed, false if we timed out
     */
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }

另外还有一个completed方法只是读取状态,不是修改,这里就忽略了

其中两个await方法一个是有时间参数的一个是没有的,对应Future.get()和Future.get(long timeout),是导致阻塞的入口,因此也不用考虑,那么重点就是这个done方法了。

3.2 WHEN:谁调用了done/什么场景下会正常唤醒

通过eclipse提供的工具,可以一层一层追踪出,有哪些地方调用了这个done,基本结论如下,

在Producer中 ,主要有设计到两个逻辑(两个类),其中

  • Sender,主要对于异常情况做做一些处理,以唤醒await的线程,包括
    • 当链接被强制关闭时
    • 当事务管理器中认为需要丢弃时
    • 当有过期的数据时
  • NetworkClient,主要是处理发送结果,包括
    • 当发送后返回失败时
    • 当返回消息太大需要切分的时候
    • 当发送成功的时候

相应的逻辑和流程可以看具体的源码

Sender中相关逻辑流程图如下

Untitled Diagram.png

NetworkClient中相关逻辑

NetworClient中调用done的地方.png

安利一个良心在线制图网站 https://www.draw.io/

3.3 WHY:为何会一直wait却没有被唤醒

通过上面的分析,我们梳理了解除线程阻塞(WAIT)的几个场景和时机,然而不幸的是,上面的场景均没有机会被执行:

  • 在kafka节点宕机时,同步发送操作的message依然会被加入到生产者缓冲区,因为加入到缓冲区的过程和链路情况是解耦的,因此可以成功被塞到buffer
  • 由于是同步的过程,因此塞到buffer之后,发送者便开始了get()的无限期等待,直到有“人”唤醒
  • 通过上面的分析我们发现:唤醒该同步等待的操作,都需要在Sender也就是IO线程中执行:要么是由于各种原因觉得这个消息需要abort,要么是收到了正确或者错误的应答(fail or complete or split)。
  • 此时奇妙的现象就发生了:同步等待的操作在IO线程,唤醒的操作也是在IO线程,这是同一个线程!也就是说,此刻已经发生了某种意义的“死锁
  • IO线程已经被无限WAITing了,因此buffer中的数据再也无法被发送
  • 于是buffer越堆越多,直到达到buffer sizez之后,开始被block
  • producer对block进行了控制,每次最大block的时间为max.block.time,然后向上抛出一个异常,于是出现了日志中的现象

综上,这次生产实践的原委基本清楚了。关于producer源码中的细节,后面再细细研读~

相关文章

  • kafka生产者使用不当导致的应用挂起/夯住

    [TOC] 1. 背景和现象 1.1 kafka版本和部署状态 kafka版本 server和client都是0....

  • Kafka 生产者概述

    生产者:往消息队列里推送消息的应用 发送消息的过程 Kafka 生产者发送消息的过程: Kafka 会将发送消息包...

  • 第三讲-消息队列

    1.kafka知识点 1.1 生产者 1.提高生产者性能,可以使用异步发送。生产者宕机会导致消息丢失,需要生成者有...

  • kafka0.8

    1、Kafka分为:生产者(producer),消费者(consumer) 2、生产者提交消息,给Kafka集群,...

  • Kafka - 生产者初步学习

    Kafka - 生产者初步学习 一、kafka生产者组件 我们从创建一个 ProducerRecord 对象开始,...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • [kafka系列]之producer端消息发送

    本小节我们来讨论Kafka生产者是如何发送消息到Kafka的, Kafka项目有一个生产者客户端,我们可以通过这个...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • Kafka消费者

    1 消费者概念 1.1 消费者与消费者组 应用程序--->kafka--->应用程序 生产者 主题 消费者...

网友评论

    本文标题:kafka生产者使用不当导致的应用挂起/夯住

    本文链接:https://www.haomeiwen.com/subject/wnjacqtx.html