美文网首页
StreamSets update和delete分离之后导致的操

StreamSets update和delete分离之后导致的操

作者: 熊_看不见 | 来源:发表于2018-11-11 20:45 被阅读0次

    现象:

    当期的操作流程如下图:


    clipboard.png

    这样的处理流程会造成如下问题:

    1、当binlog解析出的批次数据中,数据包含了对同一条数据的删除和修改操作时,无法保证操作执行的顺序。

    解决方案(针对kudu的Destination):
    Kudu的Destination中有个设置Default Operation ,这个设置的说明是:

    default operation to perform if sdc.operation.type is not set in record header.

    所以我们可以通过Record的Header Attribute 中的sdc.opeation.type来直接控制数据在kudu的Destination中执行的操作。
    对数据的操作不进行分离,通过sdc.operation.type加入的Record的Header Attributes中进行控制。
    sdc.operation.type的值的说明如下:

    • INSERT_CODE = 1;
    • DELETE_CODE = 2;
    • UPDATE_CODE = 3;
    • UPSERT_CODE = 4;

    在Javascript Evaluator 中的JS中增加如下的代码:

    for(var i = 0; i < records.length; i++) {
    try {
      var newRecord = sdcFunctions.createRecord(true);
      var attributes = records[i].attributes
      if(records[i].value['Type'] =='DELETE'){
        newRecord.attributes['sdc.operation.type']='2';
        newRecord.value = records[i].value['OldData'];
      }else{
        newRecord.attributes['sdc.operation.type']='4'
        newRecord.value = records[i].value['Data'];
      }
      newRecord.value.Type = records[i].value['Type'];
      newRecord.value.Database = records[i].value['Database'];
      newRecord.value.Table = records[i].value['Table'];
      output.write(newRecord);
    } catch (e) {
      // Send record to error
      error.write(records[i], e);
      }
    }
    ``

    相关文章

      网友评论

          本文标题:StreamSets update和delete分离之后导致的操

          本文链接:https://www.haomeiwen.com/subject/pzjffqtx.html