发生这种错误一般是客户端连接异常导致
public class RetriableCommitFailedException extends RetriableException {
private static final long serialVersionUID =1L;
public static RetriableCommitFailedException withUnderlyingMessage(String additionalMessage) {
return new RetriableCommitFailedException("Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The underlying error was: " + additionalMessage);
}
public RetriableCommitFailedException(Throwable t) {
super("Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.", t);
}
public RetriableCommitFailedException(String message) {
super(message);
}
public RetriableCommitFailedException(String message,Throwable t) {
super(message, t);
}
}
比如commit offset时,会校验groupCoordinator
public void commitOffsetsAsync(final Map offsets,final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
}else {
------------------------------------------------------------
public boolean coordinatorUnknown() {
return checkAndGetCoordinator() ==null;
}
------------------------------------------------------------
protected synchronized Node checkAndGetCoordinator() {
if (coordinator !=null &&client.isUnavailable(coordinator)) {
markCoordinatorUnknown(true);
return null;
}
return this.coordinator;
}
----isUnavailable的判断条件是是否连接失败并且连接超时----
return client.connectionFailed(node) &&client.connectionDelay(node,time.milliseconds()) >0;
网友评论