美文网首页程序员Apache Kafka
kafka生产者使用不当导致的应用挂起/夯住

kafka生产者使用不当导致的应用挂起/夯住

作者: sheen口开河 | 来源:发表于2018-12-06 20:53 被阅读1次

    [TOC]

    1. 背景和现象

    1.1 kafka版本和部署状态

    kafka版本

    server和client都是0.11.0

    部署状态

    kafka多个节点(具体多少不清楚,但是肯定不是单节点),zookeeper3个节点。topic的分区副本数为2。具备高可用。

    1.2 事件现象

    在一次生产事件中,其中一个kafka节点和zk节点因物理机宕机下线,zk和kafka broker恢复后,生产者应用并没有恢复,最终无法发送消息。

    此时生产者端的应用业务流程无法继续执行,流程走到producer模块就被Block住,然后每隔10s报错一次。

    重启producer之后,应用恢复。

    关键日志

    2018-11-07 10:52:24,015 [kfkBolt-tbl_qqhis_sq_trans_flow_raw-thread-0] [com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter:65] [ERROR] - produce:  fail at seco
    nd time.   
    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
            at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1057)
            at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:764)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:609)
            at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter$1.onCompletion(KafkaProducerWriter.java:57)
            at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:760)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
            at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:48)
            at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:27)
            at com.unionpay.cloudatlas.upstorm.component.SimpleBolt.doOnce(SimpleBolt.java:187)
            at com.unionpay.cloudatlas.upstorm.component.SimpleBolt$InnerThread.run(SimpleBolt.java:105)
    Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
    

    1.3 生产者代码和配置

    生产者代码

        @Override
        public DataRecord doOnce(Record record) {
            // TODO Auto-generated method stub
            try {
                OperCounter.getInstance().increment(Constant.KEY_KAFKA_RECEIVE);
                final ProducerRecord<String, Record> proRecord = new ProducerRecord<String, Record>(topicPub, record);
                producer.send(proRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        // TODO Auto-generated method stub
                        // 发送失败
                        if (exception != null) {
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_REIN);
                            logger.warn("producer send fail and resend.", exception);
                            try {
                                producer.send(proRecord).get();
                                OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                logger.error("produce:  fail at second time.\t", e);
                                OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                            } catch (ExecutionException e) {
                                // TODO Auto-generated catch block
                                logger.error("produce:  fail at second time.\t", e);
                                OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                            } catch (Exception e) {
                                logger.error("produce:  fail at second time.\t", e);
                                OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
                            }
                        } else {
                            OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
                        }
                    }
                });
                return null;
            } catch (TimeoutException e) {
                logger.error("produce:  fail.\t", e);
                OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
            } catch (Exception e) {
                logger.error("produce:  fail.\t", e);
                OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
            }
            return null;
        }
    

    生产者配置

    bootstrap.servers=${KAFKA_SERVER_IN}
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer
    #max time to wait,default to 60s
    max.block.ms=10000
    batch.size=65536
    buffer.memory=134217728
    retries=3
    

    通过上述代码和配置可以看出

    • 最大block事件为1000ms,也就是10s
    • buffer配置的较大,为134M
    • 生产者先是异步发送,如果发送失败,则执行一次同步发送

    2. 问题初步定位和分析

    2.1 kafka生产者简介

    排除服务端的疑点

    在最终定位之前,我们怀疑过很多点,比如是不是kafka高可用存在bug、是不是zk出问题了、是不是kafka选主失败等,最终通过生产的其他应用现象推论以及理论分析得出以下基本结论

    • kafka本身高可用机制还是比较可靠的,宕机1台节点,server的状态可以快速回复正常
    • zookeeper的高可用也没有问题,3个节点的情况下,是允许1个节点下线的,zookeeper服务正常
    • 宕机期间以及恢复后,kafka完成了leader节点的选举

    总的来说,就是不要怀疑服务端有问题。

    当然,“不要怀疑服务端有问题”只是我们定位到了原因之后的后置结论,并不表示故障排查的时候忽略服务端的潜在问题,毕竟不管是硬件资源还是软件质量都可能存在缺陷,尤其是开源产品的发展本身就是一个不断迭代完善的过程。

    关于kafka生产者

    在说原因之前,还需要说明一下kafka的producer流程。kafka生产者发送消息的粗略流程如下:

    • 首先应用调用send发送
    • 消息的KV序列化
    • 根据分区器决定消息发送到那个分区
    • 将消息添加到本地缓冲区,如果缓冲区满,则当前线程block,直到缓冲区有足够的空间或者达到最大阻塞时间(max.block.ms)
    • 有一个独立的IO线程负责从缓冲区中将消息发送到服务端
    • IO线程收到响应之后,通知producer线程完成了发送,如果需要,调用producer指定的回调函数

    注意,从上面的流程我们可以看出,在kafka的高版本客户端(貌似是0.9之后)中,发送消息天然的是一个异步的过程,也就是说,消息发送都是异步方式进行的。而我们如果需要使用同步的方式发送消息,那么我们只能通过KafkaProducer.send返回的Future对象完成,调用Future.get,关键代码如下

    //KafkaProducer.send的方法签名
    //不提供回调
    public Future<RecordMetadata> send(ProducerRecord<K, V> record);
    //提供回调
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
    
    

    当我们调用了Future.get的时候,我们做了什么

    上面出问题的代码,使用了同步的方式等待结果,那么同步的get,到底是什么样的操作呢?

    先来看下KafkaProducer.send返回的具体Future实现

    KafkaProducer.doSend

                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs);
                if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
                return result.future;
    

    上面的代码中返回的future即FutureRecordMetadata的实例,其实现了Future的get方法

    public final class FutureRecordMetadata implements Future<RecordMetadata> {
    
        @Override
        public RecordMetadata get() throws InterruptedException, ExecutionException {
            //阻塞等待
            this.result.await();
            if (nextRecordMetadata != null)
                return nextRecordMetadata.get();
            return valueOrError();
        }
    
        @Override
        public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            // Handle overflow.
            long now = System.currentTimeMillis();
            long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout;
            //阻塞等待
            boolean occurred = this.result.await(timeout, unit);
            if (nextRecordMetadata != null)
                return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            if (!occurred)
                throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
            return valueOrError();
        }
        
    }
    
    

    可以看到,get里通过result.await阻塞等待,再看看这里的result对应的类ProduceRequestResult

    public final class ProduceRequestResult {
        private final CountDownLatch latch = new CountDownLatch(1);
        /**
         * Mark this request as complete and unblock any threads waiting on its completion.
         */
        public void done() {
            if (baseOffset == null)
                throw new IllegalStateException("The method `set` must be invoked before this method.");
            this.latch.countDown();
        }
    
        /**
         * Await the completion of this request
         */
        public void await() throws InterruptedException {
            latch.await();
        }
    }
    

    可以看到,其内部的await中调用了CountDownLatch.await进行等待,同时提供了done方法,解除等待的状态。

    看到这里就比较清晰了,如果应用通过get方式同步等待结果,其内部实现时使用了CountDownLatch的await方法,当结果返回的时候,IO线程会调用done方法结束等待状态,并且返回结果。我们前面的分析只介绍了如何等待的,至于如何唤醒,将在下文介绍。

    2.2 问题定位

    通过上面的代码分析,我们几乎可以猜测到问题的出现可能和这里的设计有关——调用了get阻塞等待,但是由于某种原因,导致没有人唤醒等待着的线程。

    为了进一步验证我们的想法,在开发环境复现生产事件的情况,当出现上述现象时,通过jstack抓一下线程快照,进一步证实了我们的猜想:

    "kafka-producer-network-thread | PRODUCER_VERSION_UP_KAKFA_20181129_195652" daemon prio=10 tid=0x00007fd280253000 nid=0x20a5 waiting on condition [0x00007fd2879d8000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x0000000785982e70> (a java.util.concurrent.CountDownLatch$Sync)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
            at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
            at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
            at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
            at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
            at com.unionpay.arch.bigdata.test.BigDataUPKafkaProducer$MyCallback.onCompletion(BigDataUPKafkaProducer.java:60)
            at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
            at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
            at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:599)
            at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:575)
            at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:539)
            at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474)
            at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75)
            at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660)
            at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
            at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454)
            at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446)
            at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
            at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
            at java.lang.Thread.run(Thread.java:745)
    

    这里是开发环境复现的代码执行情况,因此Producer代码和生产上的不完全一致,但是逻辑相同。

    通过线程快照可以发现,IO线程一致处于await状态,因此后续流程无法执行!

    关于CountDownLatch的设计和实现,感兴趣的可以查看相关文档。其实这里说生产者应用被Block,严格意义上来说是不对的,线程状态其实是WAITING,因此这里的Block指的是“代码执行不下去,在当前状态一直堵着”的状态~

    因此,问题定位如下

    • 在异步发送的的回调里使用了同步的方式再次发送,由于kafka producer的同步发送是阻塞等待,且使用的是不带超时时间的无限期等待(future.get()中未指定超时时间),因此当不被唤醒时会一直wai下去
    • kafka生产者的IO线程(实际执行数据发送的线程)是单线程模型,且回调函数是在IO线程中执行的,因此回调函数的阻塞会直接导致IO线程阻塞,于是生产者缓冲区的数据无法被发送
    • kafka生产者还在不断的被应用调用,因此缓冲区一直累积并增大,当缓冲区满的时候,生产者线程会被阻塞,最大阻塞时间为max.block.time,如果改时间到达之后还是无法将数据塞入缓冲区,则会抛出一个异常,因此日志中看到达到10s之后,打印出异常栈
    • 由于使用了get没有指定超时时间,且该await一直无法被唤醒,因此这种情况会一直持续,在没有人工干预的情况下,永远不会发送成功

    生产建议

    • kafka生产者推荐使用异步方式发送,并且提供回调以响应发送成功或者失败
    • 如果需要使用future.get的方式模拟同步发送,则需要在get里加上合适的超时时间,避免因为不可预知的外部因素导致线程无法被唤醒,即使用Future.get(long timeout)的api而不是不带超时参数的Future.get()
    • 不要在异步回调中执行阻塞操作或者耗时比较久的操作,如果有必要可以考虑交给另一个线程(池)去做

    3. Future.get为何没有被唤醒

    在前面的介绍中,我们定位了问题的原因,但也留下了一些疑问:

    • 为何future.get没有被唤醒?
    • producer是何时执行了回调操作的?
    • 这种情况属于应用使用不当还是kafka的bug?

    3.1 HOW:分析思路

    想要彻底弄清楚这个问题,恐怕要去好好撸一撸kafka producer的源码了。由于kafka producer的代码非常多,其中有缓冲区操作模块、IO执行模块、元数据更新模块、事务支持模块等很多设计,这里就只从这次的事件问题切入分析,后面如果对kafka producer源码全面分析了之后再专门用几篇文章描述。

    那么思路很简单,主要从以下几个方面入手

    • 上一节中我们说到,造成wait的原因就是调用了CountDownLatch的await方法,那么何处调用了CountDownLatch的countdown方法?
    • 在所有调用了CountDownLatch.countdown的地方,是否包含了对kafka节点下线的处理?也就是说,难道kafka节点下线之后,流程就不会走到countdown了吗?

    为了弄清楚以上两个问题,我们先去看看源码。通过对ProduceRequestResult的成员变量CountDownLatch latch分析可以知道,修改其状态的方法只有2个await方法和一个done方法

        /**
         * Mark this request as complete and unblock any threads waiting on its completion.
         */
        public void done() {
            if (baseOffset == null)
                throw new IllegalStateException("The method `set` must be invoked before this method.");
            this.latch.countDown();
        }
    
        /**
         * Await the completion of this request
         */
        public void await() throws InterruptedException {
            latch.await();
        }
    
        /**
         * Await the completion of this request (up to the given time interval)
         * @param timeout The maximum time to wait
         * @param unit The unit for the max time
         * @return true if the request completed, false if we timed out
         */
        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return latch.await(timeout, unit);
        }
    

    另外还有一个completed方法只是读取状态,不是修改,这里就忽略了

    其中两个await方法一个是有时间参数的一个是没有的,对应Future.get()和Future.get(long timeout),是导致阻塞的入口,因此也不用考虑,那么重点就是这个done方法了。

    3.2 WHEN:谁调用了done/什么场景下会正常唤醒

    通过eclipse提供的工具,可以一层一层追踪出,有哪些地方调用了这个done,基本结论如下,

    在Producer中 ,主要有设计到两个逻辑(两个类),其中

    • Sender,主要对于异常情况做做一些处理,以唤醒await的线程,包括
      • 当链接被强制关闭时
      • 当事务管理器中认为需要丢弃时
      • 当有过期的数据时
    • NetworkClient,主要是处理发送结果,包括
      • 当发送后返回失败时
      • 当返回消息太大需要切分的时候
      • 当发送成功的时候

    相应的逻辑和流程可以看具体的源码

    Sender中相关逻辑流程图如下

    Untitled Diagram.png

    NetworkClient中相关逻辑

    NetworClient中调用done的地方.png

    安利一个良心在线制图网站 https://www.draw.io/

    3.3 WHY:为何会一直wait却没有被唤醒

    通过上面的分析,我们梳理了解除线程阻塞(WAIT)的几个场景和时机,然而不幸的是,上面的场景均没有机会被执行:

    • 在kafka节点宕机时,同步发送操作的message依然会被加入到生产者缓冲区,因为加入到缓冲区的过程和链路情况是解耦的,因此可以成功被塞到buffer
    • 由于是同步的过程,因此塞到buffer之后,发送者便开始了get()的无限期等待,直到有“人”唤醒
    • 通过上面的分析我们发现:唤醒该同步等待的操作,都需要在Sender也就是IO线程中执行:要么是由于各种原因觉得这个消息需要abort,要么是收到了正确或者错误的应答(fail or complete or split)。
    • 此时奇妙的现象就发生了:同步等待的操作在IO线程,唤醒的操作也是在IO线程,这是同一个线程!也就是说,此刻已经发生了某种意义的“死锁
    • IO线程已经被无限WAITing了,因此buffer中的数据再也无法被发送
    • 于是buffer越堆越多,直到达到buffer sizez之后,开始被block
    • producer对block进行了控制,每次最大block的时间为max.block.time,然后向上抛出一个异常,于是出现了日志中的现象

    综上,这次生产实践的原委基本清楚了。关于producer源码中的细节,后面再细细研读~

    相关文章

      网友评论

        本文标题:kafka生产者使用不当导致的应用挂起/夯住

        本文链接:https://www.haomeiwen.com/subject/wnjacqtx.html