canal adapter es 全量导入 这一块是否值得信任呢
看看CommonRest etl 方法
亮点在于 syncSwitch 实现了一个 开关 锁的机制 , 保证全量导入时 ,关闭开关, 同一任务增量同步会被阻塞 , 调用 adapter.etl(task, paramArray); 执行 全量导入 , 最终打开开关 ,被阻塞的增量导入会被重新唤醒。 (syncSwitch 显示阿里中间件团队代码功力)。
@PostMapping("/etl/{type}/{key}/{task}")
public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
@RequestParam(name = "params", required = false) String params) {
OuterAdapter adapter = loader.getExtension(type, key);
String destination = adapter.getDestination(task);
String lockKey = destination == null ? task : destination;
boolean locked = etlLock.tryLock(ETL_LOCK_ZK_NODE + type + "-" + lockKey);
if (!locked) {
EtlResult result = new EtlResult();
result.setSucceeded(false);
result.setErrorMessage(task + " 有其他进程正在导入中, 请稍后再试");
return result;
}
try {
boolean oriSwitchStatus;
if (destination != null) {
oriSwitchStatus = syncSwitch.status(destination);
if (oriSwitchStatus) {
syncSwitch.off(destination);
}
} else {
// task可能为destination,直接锁task
oriSwitchStatus = syncSwitch.status(task);
if (oriSwitchStatus) {
syncSwitch.off(task);
}
}
try {
List<String> paramArray = null;
if (params != null) {
paramArray = Arrays.asList(params.trim().split(";"));
}
return adapter.etl(task, paramArray);
} finally {
if (destination != null && oriSwitchStatus) {
syncSwitch.on(destination);
} else if (destination == null && oriSwitchStatus) {
syncSwitch.on(task);
}
}
} finally {
etlLock.unlock(ETL_LOCK_ZK_NODE + type + "-" + lockKey);
}
}
核心部分也挺好懂 , 根据sql 查询全量数据总数 ,进行分片 , 大于10000 ,开启 processor数量线程池 ,每片10000的数据,提交给线程池 , 执行 任务 ,获取数据 ,根据配置es 批量提交数量 ,批量提交 (upsert ,insert)
{
EtlResult etlResult = new EtlResult();
AtomicLong impCount = new AtomicLong();
List<String> errMsg = new ArrayList<>();
if (config == null) {
logger.warn("{} mapping config is null, etl go end ", type);
etlResult.setErrorMessage(type + "mapping config is null, etl go end ");
return etlResult;
}
long start = System.currentTimeMillis();
try {
DruidDataSource dataSource = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());
List<Object> values = new ArrayList<>();
// 拼接条件
if (config.getMapping().getEtlCondition() != null && params != null) {
String etlCondition = config.getMapping().getEtlCondition();
for (String param : params) {
etlCondition = etlCondition.replace("{}", "?");
values.add(param);
}
sql += " " + etlCondition;
}
if (logger.isDebugEnabled()) {
logger.debug("etl sql : {}", sql);
}
// 获取总数
String countSql = "SELECT COUNT(1) FROM ( " + sql + ") _CNT ";
long cnt = (Long) Util.sqlRS(dataSource, countSql, values, rs -> {
Long count = null;
try {
if (rs.next()) {
count = ((Number) rs.getObject(1)).longValue();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return count == null ? 0L : count;
});
// 当大于1万条记录时开启多线程
if (cnt >= 10000) {
int threadCount = Runtime.getRuntime().availableProcessors();
long offset;
long size = CNT_PER_TASK;
long workerCnt = cnt / size + (cnt % size == 0 ? 0 : 1);
if (logger.isDebugEnabled()) {
logger.debug("workerCnt {} for cnt {} threadCount {}", workerCnt, cnt, threadCount);
}
ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L);
List<Future<Boolean>> futures = new ArrayList<>();
for (long i = 0; i < workerCnt; i++) {
offset = size * i;
String sqlFinal = sql + " LIMIT " + offset + "," + size;
Future<Boolean> future = executor.submit(() -> executeSqlImport(dataSource,
sqlFinal,
values,
config.getMapping(),
impCount,
errMsg));
futures.add(future);
}
for (Future<Boolean> future : futures) {
future.get();
}
executor.shutdown();
} else {
executeSqlImport(dataSource, sql, values, config.getMapping(), impCount, errMsg);
}
logger.info("数据全量导入完成, 一共导入 {} 条数据, 耗时: {}", impCount.get(), System.currentTimeMillis() - start);
etlResult.setResultMessage("导入" + type + " 数据:" + impCount.get() + " 条");
} catch (Exception e) {
logger.error(e.getMessage(), e);
errMsg.add(type + " 数据导入异常 =>" + e.getMessage());
}
if (errMsg.isEmpty()) {
etlResult.setSucceeded(true);
} else {
etlResult.setErrorMessage(Joiner.on("\n").join(errMsg));
}
return etlResult;
}
{
ESMapping mapping = (ESMapping) adapterMapping;
Util.sqlRS(ds, sql, values, rs -> {
int count = 0;
try {
ESBulkRequest esBulkRequest = this.esConnection.new ES6xBulkRequest();
long batchBegin = System.currentTimeMillis();
while (rs.next()) {
Map<String, Object> esFieldData = new LinkedHashMap<>();
Object idVal = null;
for (FieldItem fieldItem : mapping.getSchemaItem().getSelectFields().values()) {
String fieldName = fieldItem.getFieldName();
if (mapping.getSkips().contains(fieldName)) {
continue;
}
// 如果是主键字段则不插入
if (fieldItem.getFieldName().equals(mapping.get_id())) {
idVal = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
} else {
Object val = esTemplate.getValFromRS(mapping, rs, fieldName, fieldName);
esFieldData.put(Util.cleanColumn(fieldName), val);
}
}
if (!mapping.getRelations().isEmpty()) {
mapping.getRelations().forEach((relationField, relationMapping) -> {
Map<String, Object> relations = new HashMap<>();
relations.put("name", relationMapping.getName());
if (StringUtils.isNotEmpty(relationMapping.getParent())) {
FieldItem parentFieldItem = mapping.getSchemaItem()
.getSelectFields()
.get(relationMapping.getParent());
Object parentVal;
try {
parentVal = esTemplate.getValFromRS(mapping,
rs,
parentFieldItem.getFieldName(),
parentFieldItem.getFieldName());
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (parentVal != null) {
relations.put("parent", parentVal.toString());
esFieldData.put("$parent_routing", parentVal.toString());
}
}
esFieldData.put(Util.cleanColumn(relationField), relations);
});
}
if (idVal != null) {
String parentVal = (String) esFieldData.remove("$parent_routing");
if (mapping.isUpsert()) {
ESUpdateRequest esUpdateRequest = this.esConnection.new ES6xUpdateRequest(
mapping.get_index(),
mapping.get_type(),
idVal.toString()).setDoc(esFieldData).setDocAsUpsert(true);
if (StringUtils.isNotEmpty(parentVal)) {
esUpdateRequest.setRouting(parentVal);
}
esBulkRequest.add(esUpdateRequest);
} else {
ESIndexRequest esIndexRequest = this.esConnection.new ES6xIndexRequest(mapping
.get_index(), mapping.get_type(), idVal.toString()).setSource(esFieldData);
if (StringUtils.isNotEmpty(parentVal)) {
esIndexRequest.setRouting(parentVal);
}
esBulkRequest.add(esIndexRequest);
}
} else {
idVal = esFieldData.get(mapping.getPk());
ESSearchRequest esSearchRequest = this.esConnection.new ESSearchRequest(mapping.get_index(),
mapping.get_type()).setQuery(QueryBuilders.termQuery(mapping.getPk(), idVal))
.size(10000);
SearchResponse response = esSearchRequest.getResponse();
for (SearchHit hit : response.getHits()) {
ESUpdateRequest esUpdateRequest = this.esConnection.new ES6xUpdateRequest(mapping
.get_index(), mapping.get_type(), hit.getId()).setDoc(esFieldData);
esBulkRequest.add(esUpdateRequest);
}
}
if (esBulkRequest.numberOfActions() % mapping.getCommitBatch() == 0
&& esBulkRequest.numberOfActions() > 0) {
long esBatchBegin = System.currentTimeMillis();
ESBulkResponse rp = esBulkRequest.bulk();
if (rp.hasFailures()) {
rp.processFailBulkResponse("全量数据 etl 异常 ");
}
if (logger.isTraceEnabled()) {
logger.trace("全量数据批量导入批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
(System.currentTimeMillis() - batchBegin),
(System.currentTimeMillis() - esBatchBegin),
esBulkRequest.numberOfActions(),
mapping.get_index());
}
batchBegin = System.currentTimeMillis();
esBulkRequest.resetBulk();
}
count++;
impCount.incrementAndGet();
}
if (esBulkRequest.numberOfActions() > 0) {
long esBatchBegin = System.currentTimeMillis();
ESBulkResponse rp = esBulkRequest.bulk();
if (rp.hasFailures()) {
rp.processFailBulkResponse("全量数据 etl 异常 ");
}
if (logger.isTraceEnabled()) {
logger.trace("全量数据批量导入最后批次耗时: {}, es执行时间: {}, 批次大小: {}, index; {}",
(System.currentTimeMillis() - batchBegin),
(System.currentTimeMillis() - esBatchBegin),
esBulkRequest.numberOfActions(),
mapping.get_index());
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
errMsg.add(mapping.get_index() + " etl failed! ==>" + e.getMessage());
throw new RuntimeException(e);
}
return count;
});
return true;
}
网友评论