基于微服务架构设计时,一般数据库是独立的,但是在数据统计分析的时候,很多时候很不方便,下面的代码实现了根据需求可以把多个库的表,按照业务需求同步到一个库中。当然任解决方案都不是空中楼阁,要面向业务。这种方法适合以下场景:
1.数据量不是特别大
2.实时性要求不是特别高
sync.png
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import cn.ydyun360.common.controller.QuerySqlParam;
import cn.ydyun360.common.utility.SpringContextHolder;
import cn.ydyun360.stats.service.TableSyncService;
import cn.ydyun360.stats.service.model.TableSyncResult;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class TableSyncServiceImpl implements TableSyncService {
@Autowired
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
@Override
public Mono<TableSyncResult> sync(String database, String table, Integer limit) {
var result = new TableSyncResult();
result.setStartTime(LocalDateTime.now());
result.setDatabase(database);
result.setTable(table);
result.setLimit(limit);
var webClient = getWebClient(database);
var destUpdateTime = getDestUpdateTime(database, table);
return getSrcUpdateTime(webClient, table).flatMap(srcUpdateTime -> {
return getTotal(webClient, table, srcUpdateTime, destUpdateTime).flatMap(total -> {
result.setTotal(total);
if (total > 0) {
return getTableSchema(webClient, table).flatMap(schema -> {
var sb = new StringBuilder("INSERT INTO ").append(database).append("_").append(table)
.append("(");
for (var i = 0; i < schema.size(); i++) {
var map = schema.get(i);
var field = map.get("COLUMN_NAME").toString();
sb.append(field);
if (i < schema.size() - 1) {
sb.append(",");
}
}
sb.append(") VALUES(");
for (var i = 0; i < schema.size(); i++) {
var map = schema.get(i);
var field = map.get("COLUMN_NAME").toString();
sb.append(":").append(field);
if (i < schema.size() - 1) {
sb.append(",");
}
}
sb.append(") ON DUPLICATE KEY UPDATE ");
var f = schema.stream().filter(item -> !item.get("COLUMN_NAME").equals("id"))
.collect(Collectors.toList());
for (var i = 0; i < f.size(); i++) {
var map = f.get(i);
var field = map.get("COLUMN_NAME").toString();
sb.append(field).append("=:").append(field);
if (i < f.size() - 1) {
sb.append(",");
}
}
var list = new ArrayList<Mono<Boolean>>();
var start = 0;
while (start < total) {
final Integer s = start;
list.add(getData(webClient, table, srcUpdateTime, destUpdateTime, s, limit, sb.toString()));
start += limit;
}
return Flux.concat(list).collectList().map(m -> {
result.setEndTime(LocalDateTime.now());
return result;
});
});
}
result.setEndTime(LocalDateTime.now());
return Mono.just(result);
});
});
}
private WebClient getWebClient(String database) {
return SpringContextHolder.getBean(database + "ReportServiceWebClient");
}
private String getDestUpdateTime(String database, String table) {
var sb = new StringBuilder("SELECT update_time FROM ").append(database).append("_").append(table)
.append(" ORDER BY update_time DESC LIMIT 1");
return namedParameterJdbcTemplate.getJdbcTemplate().queryForObject(sb.toString(), String.class);
}
private Mono<String> getSrcUpdateTime(WebClient webClient, String table) {
var sb = new StringBuilder("SELECT update_time FROM ").append(table)
.append(" ORDER BY update_time DESC LIMIT 1");
var querySqlParam = new QuerySqlParam();
querySqlParam.setSql(sb.toString());
return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
}).map(result -> result.get(0).get("update_time").toString());
}
private Mono<Long> getTotal(WebClient webClient, String table, String srcUpdateTime, String destUpdateTime) {
var sb = new StringBuilder("SELECT COUNT(id) AS count FROM ").append(table);
if (StringUtils.isNotBlank(destUpdateTime)) {
sb.append(" WHERE update_time >= '").append(destUpdateTime).append("' AND update_time <= '")
.append(srcUpdateTime).append("'");
}
var querySqlParam = new QuerySqlParam();
querySqlParam.setSql(sb.toString());
return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
}).map(result -> null == result || result.isEmpty() ? 0L
: Long.valueOf(result.get(0).get("count").toString()));
}
private Mono<Boolean> getData(WebClient webClient, String table, String srcUpdateTime, String destUpdateTime,
Integer start, Integer limit, String updateSql) {
var sb = new StringBuilder("SELECT * FROM ").append(table).append(" WHERE update_time >= '")
.append(destUpdateTime).append("' AND update_time <= '").append(srcUpdateTime)
.append("' ORDER BY update_time").append(" LIMIT ").append(start).append(",").append(limit);
var querySqlParam = new QuerySqlParam();
querySqlParam.setSql(sb.toString());
return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
}).map(data -> {
if (null != data && !data.isEmpty()) {
namedParameterJdbcTemplate.batchUpdate(updateSql, SqlParameterSourceUtils.createBatch(data));
}
return true;
});
}
private Mono<List<Map<String, Object>>> getTableSchema(WebClient webClient, String table) {
return webClient.get().uri("/tables/{table}/columns", table).retrieve()
.bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
});
}
}
网友评论