美文网首页
站点间数据同步系统设计与实现 201709@wedoctor

站点间数据同步系统设计与实现 201709@wedoctor

作者: 王帅199207 | 来源:发表于2017-10-13 11:37 被阅读113次

项目上最近有在不同地区的站点间同步应用数据的需求,为此开始了同步系统的设计和开发。
相关代码:wangshuai@github


系统的大体架构如下


开发过程中的问题:

  • 采集数据的形式
  • dubbo服务的提供和区分
  • 被同步数据落地的实现
  • 两个站点间数据的关联

采集数据的形式

在所有的DAO方法(在这里是Mybatis的mapper)上增加一个切面,拦截增、删、改数据的ID,并将数据ID + 表名 + 应用名信息通过dubbo调用记录至同步系统的数据库中,这样当同步任务调用时,同步系统就能根据记录的数据去相应的应用(业务系统)通过dubbo服务取出具体的业务数据,并整合、通过HTTP将数据发送出去。

同时在所有的manager上也增加一个切面,配合缓存实现相关的事务控制。


dubbo服务的提供和区分

由于业务系统的具体数据只能由业务系统提供和操作,同步系统无法直接访问业务系统的数据库表,因此同步系统和业务系统间的数据交互只能通过dubbo服务实现。

每个业务系统都会提供自己的数据服务,并且功能都相同。因此这里我采用的是同步系统提供一个client包的方式,业务系统依赖这个client包就会在应用启动时自动提供相关服务。

数据查询服务及实现:

/**
 * 同步系统查询各业务系统数据服务
 *
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-14 15:32
 */
public interface SyncDataQueryService {

    /**
     * 查询数据接口
     *
     * @param tabName
     * @param id
     * @return
     */
    ResponseDTO<String> querySyncableData(Long id, String tabName);

}

/**
 * TODO
 *
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-14 13:53
 */
@Component
public class SyncDataQueryServiceImpl implements SyncDataQueryService {

    private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataQueryServiceImpl.class);
    
    @Value("${dubbo.application.name}")
    private static String application;

    @Reference
    private MapperConfigService mapperConfigService;

    public ResponseDTO<String> querySyncableData(Long id, String tabName) {
        ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
        try {
            MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
            ......
            
            String mapperClass = mapperConfigDTO.getMapperClass();
            SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
            ......

            Map resultMap = mapper.findSyncableDataById(id);
            ......
            responseDTO.setData(JSON.toJSONString(resultMap));
        } catch (ClassNotFoundException e) {
           ......
        }
        return responseDTO;
    }
}

数据落地服务接口及实现:

/**
 * @Type SyncOperateService
 * @Desc 同步系统操作各业务系统数据服务
 * @author liuhj 
 * @created 2017年9月20日 下午1:45:55
 * @version 1.0.0
 */
public interface SyncDataOperateService {
    
    /**
     * 根据操作类型进行同步方法操作接口
     *
     * @param id
     * @param relationId 
     * @param tabName
     * @param params
     * @param operateType
     * @return
     */
    ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params, String operateType);

}

/**
 * @Type SyncOperateServiceImpl
 * @Desc 同步系统操作各业务系统数据服务
 * @author liuhj
 * @created 2017年9月20日 下午1:53:26
 * @version 1.0.0
 */
@Component
public class SyncDataOperateServiceImpl implements SyncDataOperateService {

    private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataOperateServiceImpl.class);

    @Value("${dubbo.application.name}")
    private static String application;

    /**
     * 配置系统服务
     */
    @Reference
    private MapperConfigService mapperConfigService;

    @Override
    public ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params,
            String operateType) {
        ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
        try {
            MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
            ......
            String mapperClass = mapperConfigDTO.getMapperClass();
            SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
            ......
            int count = 0;
            Long callBackId = null;
            if ("I".equalsIgnoreCase(operateType)) {
                count = mapper.insertSyncableData(params);
                callBackId = (long) (int) params.get("id");
            }
            if ("U".equalsIgnoreCase(operateType)) {
                params.put("id", relationId);
                count = mapper.updateSyncableData(params);
                callBackId = null;
            }
            if ("D".equalsIgnoreCase(operateType)) {
                count = mapper.deleteSyncableDataById(relationId);
                callBackId = null;
            }
            ......
            responseDTO.setData(String.valueOf(callBackId));
        } catch (Exception e) {
            LOGGER.error("SyncDataOperateServiceImpl.operateSyncableData ->加载mapper类失败", e);
            responseDTO.setDataMessage(ExceptionEnum.ERROR.getCode(), "加载mapper类失败");
            return responseDTO;
        }
        return responseDTO;
    }
}

通过java API在应用启动时动态注册dubbo服务(目的是通过version将不同的服务区分开来):

/**
 * 各业务系统注册不同的服务实现类
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-20 11:13
 */
@Component
public class DubboServiceRegister {

    @Value("${dubbo.application.name}")
    private String application;

    @Value("${dubbo.registry.address}")
    private String zkAddress;

    @Value("${dubbo.protocol.port}")
    private int dubboPort;

    /**
     * 服务实现
     */
    @Resource
    private SyncDataQueryService syncDataQueryService;

    @Resource
    private SyncDataOperateService syncDataOperateService;

    @PostConstruct
    public void regist() {

        //当前应用配置
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName(application);

        //连接注册中心配置
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress(zkAddress);

        //服务提供者协议配置
        ProtocolConfig protocolConfig = new ProtocolConfig();
        protocolConfig.setName("dubbo");
        protocolConfig.setPort(dubboPort);

        //服务提供者暴露服务配置
        ServiceConfig<SyncDataQueryService> dataQueryServiceConfig = new ServiceConfig<>();
        dataQueryServiceConfig.setApplication(applicationConfig);
        dataQueryServiceConfig.setRegistry(registryConfig);
        dataQueryServiceConfig.setProtocol(protocolConfig);
        dataQueryServiceConfig.setInterface(SyncDataQueryService.class);
        dataQueryServiceConfig.setRef(syncDataQueryService);
        dataQueryServiceConfig.setVersion(application);

        ServiceConfig<SyncDataOperateService> dataOperateServiceConfig = new ServiceConfig<>();
        dataOperateServiceConfig.setApplication(applicationConfig);
        dataOperateServiceConfig.setRegistry(registryConfig);
        dataOperateServiceConfig.setProtocol(protocolConfig);
        dataOperateServiceConfig.setInterface(SyncDataOperateService.class);
        dataOperateServiceConfig.setRef(syncDataOperateService);
        dataOperateServiceConfig.setVersion(application);

        //暴露及注册服务
        dataQueryServiceConfig.export();
        dataOperateServiceConfig.export();
    }
}

如此,在同步系统调用服务时,就能通过version区分到对应的服务。


被同步数据落地的实现

当要同步的数据以JSON的形式到达分站点(被同步站点)时,被API系统接入并通过dubbo将数据传给同步系统,同步系统再调用各站点的数据落地服务实现类,这里就有一个问题:数据如何落地到数据库中?如果直接动态拼接sql直接执行是一种比较简单的实现方式,但是我们这里使用的是mybatis,那么如何找到对应的mapper并动态执行插入、更新、删除的方法?

在这里我的设计是定义一个父接口:

public interface SyncableMapper {

    Map findSyncableDataById(Long id);

    int insertSyncableData(Map paramMap);

    int updateSyncableData(Map paramMap);

    int deleteSyncableDataById(Long id);

}

这个接口定义了对业务系统数据表进行同步的公共方法,然后我让需要同步的mapper接口继承这个公共接口,并在mapper-xml里定义sql,这样同步时直接操作父接口的引用(实现类从spring上下文中根据条件动态获取):

@Repository("dubboApplicationDAO")
public interface DubboApplicationMapper extends SyncableMapper {

    ......
}

获取对应mapper并操作的代码(SyncDataOperateServiceImpl 类中):

//从配置中获取mapper的类的全限定名
String mapperClass = mapperConfigDTO.getMapperClass();

//动态获取操作对应表的mapper
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......

if ("I".equalsIgnoreCase(operateType)) {
   mapper.insertSyncableData(params);
}
if ("U".equalsIgnoreCase(operateType)) {
    params.put("id", relationId);
    count = mapper.updateSyncableData(params);
}
if ("D".equalsIgnoreCase(operateType)) {
    count = mapper.deleteSyncableDataById(relationId);
}

那么自然的,有了上述规则后,采集同步数据时我们只记录继承自我们定义的父接口的mapper的操作数据(在执行采集切面时加入判断):

public void doCollect(JoinPoint point) {
        Class mapperType = point.getSignature().getDeclaringType();

        if(mapperType.equals(SyncableMapper.class)) {
            //如果执行的是SyncableMapper接口定义的方法 则不执行切面逻辑(只对实际的业务操作进行拦截)
            System.out.println("执行的是SyncableMapper接口定义的方法 不执行切面逻辑");
            return;
        }

        Class[] classes = point.getSignature().getDeclaringType().getInterfaces();
        boolean flag = Arrays.stream(classes).anyMatch((c) -> c.equals(SyncableMapper.class));
        if(!flag) {
            //如果被拦截的mapper没有实现SyncableMapper接口,则不对其进行同步操作
            System.out.println("被拦截的mapper没有实现SyncableMapper接口,不对其进行同步操作");
            return;
        }

        String fullMethodSignature = point.getSignature().getDeclaringTypeName() + "." + point.getSignature().getName();
        String sqlType = mybatisSqlUtils.getSqlType(fullMethodSignature);

        if("SELECT".equalsIgnoreCase(sqlType)) {
            return;
        }
       ......//
}

两个站点间数据的关联

假设主站点中的某张表的某一条数据, id为A, 将其同步至分站点对应表中后的id是B,那么我们应该在数据在分站点中插入时,记录A、B两个id之间的关联,方便在做同步更新或者删除操作时使用:

CREATE TABLE `cloud_sync_relation` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '序号',
  `gmt_created` datetime NOT NULL COMMENT '创建时间',
  `gmt_modified` datetime NOT NULL COMMENT '修改时间',
  `remark` varchar(255) NOT NULL COMMENT '备注',
  `is_deleted` tinyint(2) NOT NULL COMMENT '是否删除 0否1是',
  `from_id` bigint(20) NOT NULL COMMENT '关联方id',
  `to_id` bigint(20) NOT NULL COMMENT '被关联方id',
  `relate_site_code` varchar(50) NOT NULL COMMENT '关联站点code',
  `relate_resource` varchar(50) NOT NULL COMMENT '关联表名',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='同步系统关联关系'

附:

动态获取dubbo服务对象帮助类:

/**
 * 动态获取dubbo服务对象帮助类
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-15 16:34
 */
@Component
public class DubboUtil {

    public static final String VERSION = "VERSION";

    public static final String GROUP = "GROUP";

    public static final String ZOOKEEPERURL = "ZOOKEEPERURL";

    public static final String APPLICATION = "APPLICATION";

    @Value("${dubbo.application.name}")
    private String appName;

    @Value("${dubbo.registry.address}")
    private String zkUrl;

    public <T> T getDubboService(Class<T> clazz, Map<String, String> paramMap){
        ApplicationConfig applicationConfig = new ApplicationConfig();

        String app = paramMap.get(APPLICATION);
        if(app != null && !"".equals(app)) {
            applicationConfig.setName(app);
        } else {
            applicationConfig.setName(appName);
        }


        RegistryConfig registryConfig = new RegistryConfig();

        String zookeeperurl = paramMap.get(ZOOKEEPERURL);
        if(zookeeperurl != null && !"".equals(zookeeperurl)) {
            registryConfig.setAddress(zookeeperurl);
        } else {
            registryConfig.setAddress(zkUrl);
        }

        applicationConfig.setRegistry(registryConfig);

        ReferenceConfig<T> rc = new ReferenceConfig<T>();
        rc.setApplication(applicationConfig);

        String version = paramMap.get(VERSION);
        if(version != null && !"".equals(version)) {
            rc.setVersion(version);
        }

        String group = paramMap.get(GROUP);
        if(group != null && !"".equals(group)) {
            rc.setGroup(group);
        }
        rc.setInterface(clazz);

        //缓存ReferenceConfig对象,防止内存和连接泄露
        ReferenceConfigCache cache = ReferenceConfigCache.getCache();
        return cache.get(rc);
    }

}

相关文章

网友评论

      本文标题:站点间数据同步系统设计与实现 201709@wedoctor

      本文链接:https://www.haomeiwen.com/subject/ubepyxtx.html