背景
Debezium引擎在进行数据同步的时候,如果期间发生了异常问题。会有专门的异常处理类来出来,MySQL异常处理类如下:io.debezium.connector.mysql.MySqlErrorHandler.java
isRetriable方法作用:如果识别到指定的异常信息,引擎会触发定时重启。而对于网络异常,目前1.6.0.Final版本没法识别到,模拟网络端口后,会有如下的异常:
2021-07-27 17:56:26.481 ERROR 8556 --- [30.233.147:3306] io.debezium.pipeline.ErrorHandler : Producer failure
io.debezium.DebeziumException: Network is unreachable: recv failed
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1154) ~[debezium-connector-mysql-1.6.0.Final.jar:1.6.0.Final]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1199) [debezium-connector-mysql-1.6.0.Final.jar:1.6.0.Final]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_271]
Caused by: java.net.SocketException: Network is unreachable: recv failed
at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_271]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_271]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_271]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_271]
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:222) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.peek(ByteArrayInputStream.java:194) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:930) [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
... 3 common frames omitted
当网络恢复的时候,引擎无法正常工作。需要将此异常添加到ErrorHandler类中。
新建异常处理类
在工程项目,手动创建io.debezium.connector.mysql.MySqlErrorHandler类,将SocketException加上去。代码如下:
public class MySqlErrorHandler extends ErrorHandler {
private static final String SQL_CODE_TOO_MANY_CONNECTIONS = "08004";
public MySqlErrorHandler(String logicalName, ChangeEventQueue<?> queue) {
super(MySqlConnector.class, logicalName, queue);
}
@Override
protected boolean isRetriable(Throwable throwable) {
if (throwable instanceof SQLException) {
final SQLException sql = (SQLException) throwable;
return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSQLState());
}
else if (throwable instanceof ServerException) {
final ServerException sql = (ServerException) throwable;
return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSqlState());
}
else if(throwable instanceof SocketException) {
return true;
}
else if (throwable instanceof DebeziumException && throwable.getCause() != null) {
return isRetriable(throwable.getCause());
}
return false;
}
}
这样修改后,启动项目工程。然后模拟网络异常问题,则控制台会打印出重连信息,如下:
但是没有一直打印下去,所以猜测使用的默认配置,需要手动进行配置指定等待时长。
访问github debezium代码库,找到打印的相关类:io.debezium.connector.common.BaseSourceTask.java,阅读代码很容易找到读取配置的地方:
于是在引擎启动的地方,配置此参数。比如设置为半小时。配置是在CommonConnectorConfig.java类中,默认配置为10s。
启动的时候,设置重试等待时间为半小时:
重新启动引擎
然后模拟网络异常问题,发现一直打印消息,需要说明的是,其内部就是个do while循环,循环到指定时间开始重启引擎,可以理解为等待多长时间重启引擎,此机制不能立马感知网络恢复,只是做了个定时重启而已。
网友评论