背景:公司业务,订单系统生成订单后,加埋点日志,将日志数据推送到kafka,通过Flink消费kafka消息,Flink按照一分钟为单位滚动窗口的方式,对订单数据计算统计,最后将一分钟统计数据存入ES。
本地测试没有发现问题,部署到线上环境。发现统计数据存在丢单情况。经过几天的观察发现,一直存在对不上。
解决思路:开始以为是Flink 本身并发造成数据脏读问题。所以在timeWindow处理里面加了锁,发现还是不对。又怀疑ElasticsearchSink写入数据的数据存在并发。打印日志发现存储并不是并行处理。经过几天的努力,终于发现是在更新到ES的时候出问题。
处理结果如下,用乐观锁方式更新,更新的时候加上vesion,ES数据有版本的概念。
Client client = EsClientUtil.getInstance().getClient();
//根据id查询ES数据
GetResponse gResponse = client.prepareGet(payIndex, payType, payInfo.getId()).execute().actionGet();
if (gResponse.isExists()) {
json = gResponse.getSource();
updatePayMqMonitorParameters(payInfo, json);
long version = gResponse.getVersion();
return Requests.indexRequest().index(payIndex).type(payType).id(payInfo.getId()).version(version).source(json);
} else {
......
//如果数据不存在versionType用EXTERNAL,表示当前更新version要比库中数据version大。默认为相等才更新成功
return Requests.indexRequest().index(payIndex).type(payType).id(payInfo.getId()).versionType(VersionType.EXTERNAL).version(1L).source(json);
}
然后再在失败处理逻辑加上错误处理
//实现ActionRequestFailureHandler,以指定如何处理请求失败的情况
ActionRequestFailureHandler actionRequestFailureHandler = new PriceActionRequestFailureHandler();
// 根据conf、addresses、sinkFunction构建ElasticsearchSink实例
return new ElasticsearchSink<>(getESClusterParam(), getInetSocketAddress(), elasticsearchSinkFunction, actionRequestFailureHandler);
//VersionConflictEngineException异常为ES更新版本冲突异常
if (ExceptionUtils.findThrowable(failure, VersionConflictEngineException.class).isPresent()) {
// ES更新版本冲突 错误
IndexRequest indexAction = (IndexRequest)action;
String index = indexAction.index();
String type = indexAction.type();
String id = indexAction.id();
Map<String, Object> json = indexAction.sourceAsMap();
Client client = EsClientUtil.getInstance().getClient();
GetResponse gResponse = client.prepareGet(index, type, id).execute().actionGet();
long version = gResponse.getVersion();
IndexRequest newIndex = Requests.indexRequest().index(index).type(type).version(version).id(id).source(json);
indexer.add(newIndex);
}
}
这样当出现处理错误后,有异常处理机制,就能保证数据的正确性。
网友评论