美文网首页canal
canal源码解析 es adapter sql 字段解惑

canal源码解析 es adapter sql 字段解惑

作者: pcgreat | 来源:发表于2020-01-03 16:41 被阅读0次

    当使用 cannal adapter 时候 ,esMapping sql 使我感到疑惑 ,binlog 消息 通过这个sql 做了聚合 。 但是 下面这个sql 不是做了查询所有数据? 然后在同步到es中去的?

    dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
    destination: example            # cannal的instance或者MQ的topic
    esMapping:
      _index: mytest_user           # es 的索引名称
      _type: _doc                   # es 的doc名称
      _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
    #  pk: id                       # 如果不需要_id, 则需要指定一个属性为主键属性
      # sql映射
      sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
            a.c_time as _c_time, c.labels as _labels from user a
            left join role b on b.id=a.role_id
            left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
            group by user_id) c on c.user_id=a.id"
    #  objFields:
    #    _labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
    #    _obj: obj:{"test":"123"}
      etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数
      commitBatch: 3000                         # 提交批大小
    

    一路 从 CanalAdapterLoader , CanalAdapterWorker ,AbstractCanalAdapterWorker,ESAdapter ,最终在 ESSyncService sync 方法找到了答案 , 阿里同学的代码 写的还是很容易看懂的 。 注意下面方法 注释

       // dml ,binlog msg 转化而成  , config 即是你配置的Es config 文件的配置类
       // binlog msg 最终转化为 对应的es insert , update , delete 操作
       public void sync(ESSyncConfig config, Dml dml) {
            try {
                // 如果是按时间戳定时更新则返回
                if (config.getEsMapping().isSyncByTimestamp()) {
                    return;
                }
    
                long begin = System.currentTimeMillis();
    
                String type = dml.getType();
                if (type != null && type.equalsIgnoreCase("INSERT")) {
                    insert(config, dml);
                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
                    update(config, dml);
                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                    delete(config, dml);
                } else {
                    return;
                }
    
                if (logger.isTraceEnabled()) {
                    logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
                        (System.currentTimeMillis() - begin),
                        dml.getDestination(),
                        config.getEsMapping().get_index());
                }
            } catch (Throwable e) {
                logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
                throw new RuntimeException(e);
            }
        }
    

    insert 方法 , 如果 esMapping sql 是单表 所有字段都为简单字段且 binlog msg 对应 sql 主表 ,会调用 singleTableSimpleFiledInsert ,直接插入 es 对应索引中去 ,如果binlog msg 对应的是 sql 主表且 非简单字段 ,则 会从esMapping 拿到主键(列表) 然后 和sql 中 , msg匹配后 拼接后 ,查询 sql , 插入es ,如果 binlog msg 对应的是sql 的 从表 ,有3种更新方法 ,有点小看阿里同学了 ,这块设计,我之前没想到 。

    
        /**
         * 插入操作dml
         *
         * @param config es配置
         * @param dml dml数据
         */
        private void insert(ESSyncConfig config, Dml dml) {
            List<Map<String, Object>> dataList = dml.getData();
            if (dataList == null || dataList.isEmpty()) {
                return;
            }
            SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
            for (Map<String, Object> data : dataList) {
                if (data == null || data.isEmpty()) {
                    continue;
                }
    
                if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
                    // ------单表 & 所有字段都为简单字段------
                    singleTableSimpleFiledInsert(config, dml, data);
                } else {
                    // ------是主表 查询sql来插入------
                    if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
                        mainTableInsert(config, dml, data);
                    }
    
                    // 从表的操作
                    for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
                        if (tableItem.isMain()) {
                            continue;
                        }
                        if (!tableItem.getTableName().equals(dml.getTable())) {
                            continue;
                        }
                        // 关联条件出现在主表查询条件是否为简单字段
                        boolean allFieldsSimple = true;
                        for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                            if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
                                allFieldsSimple = false;
                                break;
                            }
                        }
                        // 所有查询字段均为简单字段
                        if (allFieldsSimple) {
                            // 不是子查询
                            if (!tableItem.isSubQuery()) {
                                // ------关联表简单字段插入------
                                Map<String, Object> esFieldData = new LinkedHashMap<>();
                                for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
                                    Object value = esTemplate.getValFromData(config.getEsMapping(),
                                        data,
                                        fieldItem.getFieldName(),
                                        fieldItem.getColumn().getColumnName());
                                    esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
                                }
    
                                joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
                            } else {
                                // ------关联子表简单字段插入------
                                subTableSimpleFieldOperation(config, dml, data, null, tableItem);
                            }
                        } else {
                            // ------关联子表复杂字段插入 执行全sql更新es------
                            wholeSqlOperation(config, dml, data, null, tableItem);
                        }
                    }
                }
            }
        }
    
    

    相关文章

      网友评论

        本文标题:canal源码解析 es adapter sql 字段解惑

        本文链接:https://www.haomeiwen.com/subject/opizoctx.html