美文网首页Spring Boot
一种数据库同步简易方法

一种数据库同步简易方法

作者: EasyNetCN | 来源:发表于2020-03-29 07:03 被阅读0次

    基于微服务架构设计时,一般数据库是独立的,但是在数据统计分析的时候,很多时候很不方便,下面的代码实现了根据需求可以把多个库的表,按照业务需求同步到一个库中。当然任解决方案都不是空中楼阁,要面向业务。这种方法适合以下场景:

    1.数据量不是特别大

    2.实时性要求不是特别高

    sync.png
    import java.time.LocalDateTime;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.ParameterizedTypeReference;
    import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
    import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
    import org.springframework.stereotype.Service;
    import org.springframework.web.reactive.function.client.WebClient;
    
    import cn.ydyun360.common.controller.QuerySqlParam;
    import cn.ydyun360.common.utility.SpringContextHolder;
    import cn.ydyun360.stats.service.TableSyncService;
    import cn.ydyun360.stats.service.model.TableSyncResult;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    @Service
    public class TableSyncServiceImpl implements TableSyncService {
        @Autowired
        private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
    
        @Override
        public Mono<TableSyncResult> sync(String database, String table, Integer limit) {
            var result = new TableSyncResult();
    
            result.setStartTime(LocalDateTime.now());
            result.setDatabase(database);
            result.setTable(table);
            result.setLimit(limit);
    
            var webClient = getWebClient(database);
            var destUpdateTime = getDestUpdateTime(database, table);
    
            return getSrcUpdateTime(webClient, table).flatMap(srcUpdateTime -> {
                return getTotal(webClient, table, srcUpdateTime, destUpdateTime).flatMap(total -> {
                    result.setTotal(total);
    
                    if (total > 0) {
                        return getTableSchema(webClient, table).flatMap(schema -> {
                            var sb = new StringBuilder("INSERT INTO ").append(database).append("_").append(table)
                                    .append("(");
    
                            for (var i = 0; i < schema.size(); i++) {
                                var map = schema.get(i);
                                var field = map.get("COLUMN_NAME").toString();
    
                                sb.append(field);
    
                                if (i < schema.size() - 1) {
                                    sb.append(",");
                                }
                            }
    
                            sb.append(") VALUES(");
    
                            for (var i = 0; i < schema.size(); i++) {
                                var map = schema.get(i);
                                var field = map.get("COLUMN_NAME").toString();
    
                                sb.append(":").append(field);
    
                                if (i < schema.size() - 1) {
                                    sb.append(",");
                                }
                            }
    
                            sb.append(") ON DUPLICATE KEY UPDATE ");
    
                            var f = schema.stream().filter(item -> !item.get("COLUMN_NAME").equals("id"))
                                    .collect(Collectors.toList());
    
                            for (var i = 0; i < f.size(); i++) {
                                var map = f.get(i);
                                var field = map.get("COLUMN_NAME").toString();
    
                                sb.append(field).append("=:").append(field);
    
                                if (i < f.size() - 1) {
                                    sb.append(",");
                                }
                            }
    
                            var list = new ArrayList<Mono<Boolean>>();
                            var start = 0;
    
                            while (start < total) {
                                final Integer s = start;
    
                                list.add(getData(webClient, table, srcUpdateTime, destUpdateTime, s, limit, sb.toString()));
    
                                start += limit;
                            }
    
                            return Flux.concat(list).collectList().map(m -> {
                                result.setEndTime(LocalDateTime.now());
    
                                return result;
                            });
                        });
    
                    }
    
                    result.setEndTime(LocalDateTime.now());
    
                    return Mono.just(result);
                });
            });
        }
    
        private WebClient getWebClient(String database) {
            return SpringContextHolder.getBean(database + "ReportServiceWebClient");
        }
    
        private String getDestUpdateTime(String database, String table) {
            var sb = new StringBuilder("SELECT update_time FROM ").append(database).append("_").append(table)
                    .append(" ORDER BY update_time DESC LIMIT 1");
    
            return namedParameterJdbcTemplate.getJdbcTemplate().queryForObject(sb.toString(), String.class);
        }
    
        private Mono<String> getSrcUpdateTime(WebClient webClient, String table) {
            var sb = new StringBuilder("SELECT update_time FROM ").append(table)
                    .append(" ORDER BY update_time DESC LIMIT 1");
    
            var querySqlParam = new QuerySqlParam();
    
            querySqlParam.setSql(sb.toString());
    
            return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
                    .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                    }).map(result -> result.get(0).get("update_time").toString());
        }
    
        private Mono<Long> getTotal(WebClient webClient, String table, String srcUpdateTime, String destUpdateTime) {
            var sb = new StringBuilder("SELECT COUNT(id) AS count FROM ").append(table);
    
            if (StringUtils.isNotBlank(destUpdateTime)) {
                sb.append(" WHERE update_time >= '").append(destUpdateTime).append("' AND update_time <= '")
                        .append(srcUpdateTime).append("'");
            }
    
            var querySqlParam = new QuerySqlParam();
    
            querySqlParam.setSql(sb.toString());
    
            return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
                    .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                    }).map(result -> null == result || result.isEmpty() ? 0L
                            : Long.valueOf(result.get(0).get("count").toString()));
        }
    
        private Mono<Boolean> getData(WebClient webClient, String table, String srcUpdateTime, String destUpdateTime,
                Integer start, Integer limit, String updateSql) {
            var sb = new StringBuilder("SELECT * FROM ").append(table).append(" WHERE update_time >= '")
                    .append(destUpdateTime).append("' AND update_time <= '").append(srcUpdateTime)
                    .append("' ORDER BY update_time").append(" LIMIT ").append(start).append(",").append(limit);
    
            var querySqlParam = new QuerySqlParam();
    
            querySqlParam.setSql(sb.toString());
    
            return webClient.post().uri("/databases/sql/query").bodyValue(querySqlParam).retrieve()
                    .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                    }).map(data -> {
                        if (null != data && !data.isEmpty()) {
                            namedParameterJdbcTemplate.batchUpdate(updateSql, SqlParameterSourceUtils.createBatch(data));
                        }
    
                        return true;
                    });
        }
    
        private Mono<List<Map<String, Object>>> getTableSchema(WebClient webClient, String table) {
            return webClient.get().uri("/tables/{table}/columns", table).retrieve()
                    .bodyToMono(new ParameterizedTypeReference<List<Map<String, Object>>>() {
                    });
        }
    }
    

    相关文章

      网友评论

        本文标题:一种数据库同步简易方法

        本文链接:https://www.haomeiwen.com/subject/drqyuhtx.html