美文网首页
kafka marking the coordinator (i

kafka marking the coordinator (i

作者: 邵红晓 | 来源:发表于2021-10-22 13:46 被阅读0次

问题

flink kafka 设置自动offset 提交
kafka-client 0.11.0.2
kafka-broker 1.1.1
隔一段时间老报错如下
marking the coordinator (id rack null) dead for group

分析

通过阅读源码
org.apache.kafka.clients.consumer.internals.AbstractCoordinator

 protected synchronized void coordinatorDead() {
        if (this.coordinator != null) {
            log.info("Marking the coordinator {} dead for group {}", this.coordinator, this.groupId);
            this.client.failUnsentRequests(this.coordinator, CoordinatorNotAvailableException.INSTANCE);
            this.coordinator = null;
        }

    }

HeartbeatThread.run()
while(true) {
                    synchronized(AbstractCoordinator.this) {
                        if (this.closed) {
                            return;
                        }

                        if (!this.enabled) {
                            AbstractCoordinator.this.wait();
                        } else if (AbstractCoordinator.this.state != AbstractCoordinator.MemberState.STABLE) {
                            this.disable();
                        } else {
                            AbstractCoordinator.this.client.pollNoWakeup();
                            long now = AbstractCoordinator.this.time.milliseconds();
                            if (AbstractCoordinator.this.coordinatorUnknown()) {
                                if (AbstractCoordinator.this.findCoordinatorFuture != null || AbstractCoordinator.this.lookupCoordinator().failed()) {
                                    AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                                }
                            } else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
                                AbstractCoordinator.this.coordinatorDead();
                            } else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) {
                                AbstractCoordinator.this.maybeLeaveGroup();
                            } else if (!AbstractCoordinator.this.heartbeat.shouldHeartbeat(now)) {
                                AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                            } else {
                                AbstractCoordinator.this.heartbeat.sentHeartbeat(now);
                                AbstractCoordinator.this.sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
                                    public void onSuccess(Void value) {
                                        synchronized(AbstractCoordinator.this) {
                                            AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                        }
                                    }

                                    public void onFailure(RuntimeException e) {
                                        synchronized(AbstractCoordinator.this) {
                                            if (e instanceof RebalanceInProgressException) {
                                                AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                            } else {
                                                AbstractCoordinator.this.heartbeat.failHeartbeat();
                                                AbstractCoordinator.this.notify();
                                            }

                                        }
                                    }
                                });
                            }
                        }
                    }

通过源码发现导致问题的原因是client连接kafka brocker coordinator 超时引起

解决

阅读源码中发现如上报错信息只有client 0.11.0.2版本中才会有,可以选择升级客户端client版本
或者调整session超时时长,调整心跳超时时长,调整连接重试时长默认50ms调整为3s

    prop.setProperty("session.timeout.ms", "300000")
    prop.setProperty("heartbeat.interval.ms", "100000")
    prop.setProperty("retry.backoff.ms", "3000")
    prop.setProperty("reconnect.backoff.ms", "3000")
    prop.setProperty("reconnect.backoff.max.ms", "5000")
    prop.setProperty("request.timeout.ms", "400000")
    prop.setProperty("fetch.max.wait.ms", "5000")
    prop.setProperty("enable.auto.commit", "true")
    prop.setProperty("max.poll.records","200")
    prop.setProperty("auto.commit.interval.ms", "30000")
    prop.setProperty("max.poll.interval.ms", "600000")

相关文章

网友评论

      本文标题:kafka marking the coordinator (i

      本文链接:https://www.haomeiwen.com/subject/ylgcaltx.html