美文网首页
Flink写入ES偶发性数据错误

Flink写入ES偶发性数据错误

作者: 小陈阿飞 | 来源:发表于2018-10-18 17:40 被阅读238次

    背景:公司业务,订单系统生成订单后,加埋点日志,将日志数据推送到kafka,通过Flink消费kafka消息,Flink按照一分钟为单位滚动窗口的方式,对订单数据计算统计,最后将一分钟统计数据存入ES。

    本地测试没有发现问题,部署到线上环境。发现统计数据存在丢单情况。经过几天的观察发现,一直存在对不上。
    解决思路:开始以为是Flink 本身并发造成数据脏读问题。所以在timeWindow处理里面加了锁,发现还是不对。又怀疑ElasticsearchSink写入数据的数据存在并发。打印日志发现存储并不是并行处理。经过几天的努力,终于发现是在更新到ES的时候出问题。
    处理结果如下,用乐观锁方式更新,更新的时候加上vesion,ES数据有版本的概念。

    Client client = EsClientUtil.getInstance().getClient();
    //根据id查询ES数据
    GetResponse gResponse = client.prepareGet(payIndex, payType, payInfo.getId()).execute().actionGet();
            if (gResponse.isExists()) {
                json = gResponse.getSource();
                updatePayMqMonitorParameters(payInfo, json);
                long version = gResponse.getVersion();
                return Requests.indexRequest().index(payIndex).type(payType).id(payInfo.getId()).version(version).source(json);
            } else {
                ......
               //如果数据不存在versionType用EXTERNAL,表示当前更新version要比库中数据version大。默认为相等才更新成功
                return Requests.indexRequest().index(payIndex).type(payType).id(payInfo.getId()).versionType(VersionType.EXTERNAL).version(1L).source(json);
            }
    

    然后再在失败处理逻辑加上错误处理

    //实现ActionRequestFailureHandler,以指定如何处理请求失败的情况
    ActionRequestFailureHandler actionRequestFailureHandler = new PriceActionRequestFailureHandler();
     // 根据conf、addresses、sinkFunction构建ElasticsearchSink实例
    return new ElasticsearchSink<>(getESClusterParam(), getInetSocketAddress(), elasticsearchSinkFunction, actionRequestFailureHandler);
    
    //VersionConflictEngineException异常为ES更新版本冲突异常
    if (ExceptionUtils.findThrowable(failure, VersionConflictEngineException.class).isPresent()) {
                // ES更新版本冲突 错误
                IndexRequest indexAction = (IndexRequest)action;
                String index = indexAction.index();
                String type = indexAction.type();
                String id = indexAction.id();
                Map<String, Object> json = indexAction.sourceAsMap();
                Client client = EsClientUtil.getInstance().getClient();
                GetResponse gResponse = client.prepareGet(index, type, id).execute().actionGet();
                long version = gResponse.getVersion();
                IndexRequest newIndex = Requests.indexRequest().index(index).type(type).version(version).id(id).source(json);
                    indexer.add(newIndex);
                }
            }
    

    这样当出现处理错误后,有异常处理机制,就能保证数据的正确性。

    相关文章

      网友评论

          本文标题:Flink写入ES偶发性数据错误

          本文链接:https://www.haomeiwen.com/subject/kweszftx.html