项目上最近有在不同地区的站点间同步应用数据的需求,为此开始了同步系统的设计和开发。
相关代码:wangshuai@github
系统的大体架构如下:
开发过程中的问题:
- 采集数据的形式
- dubbo服务的提供和区分
- 被同步数据落地的实现
- 两个站点间数据的关联
采集数据的形式
在所有的DAO方法(在这里是Mybatis的mapper)上增加一个切面,拦截增、删、改数据的ID,并将数据ID + 表名 + 应用名信息通过dubbo调用记录至同步系统的数据库中,这样当同步任务调用时,同步系统就能根据记录的数据去相应的应用(业务系统)通过dubbo服务取出具体的业务数据,并整合、通过HTTP将数据发送出去。
同时在所有的manager上也增加一个切面,配合缓存实现相关的事务控制。
dubbo服务的提供和区分
由于业务系统的具体数据只能由业务系统提供和操作,同步系统无法直接访问业务系统的数据库表,因此同步系统和业务系统间的数据交互只能通过dubbo服务实现。
每个业务系统都会提供自己的数据服务,并且功能都相同。因此这里我采用的是同步系统提供一个client包的方式,业务系统依赖这个client包就会在应用启动时自动提供相关服务。
数据查询服务及实现:
/**
* 同步系统查询各业务系统数据服务
*
* @author wangshuai
* @version V1.0
* @since 2017-09-14 15:32
*/
public interface SyncDataQueryService {
/**
* 查询数据接口
*
* @param tabName
* @param id
* @return
*/
ResponseDTO<String> querySyncableData(Long id, String tabName);
}
/**
* TODO
*
* @author wangshuai
* @version V1.0
* @since 2017-09-14 13:53
*/
@Component
public class SyncDataQueryServiceImpl implements SyncDataQueryService {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataQueryServiceImpl.class);
@Value("${dubbo.application.name}")
private static String application;
@Reference
private MapperConfigService mapperConfigService;
public ResponseDTO<String> querySyncableData(Long id, String tabName) {
ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
try {
MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
......
String mapperClass = mapperConfigDTO.getMapperClass();
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......
Map resultMap = mapper.findSyncableDataById(id);
......
responseDTO.setData(JSON.toJSONString(resultMap));
} catch (ClassNotFoundException e) {
......
}
return responseDTO;
}
}
数据落地服务接口及实现:
/**
* @Type SyncOperateService
* @Desc 同步系统操作各业务系统数据服务
* @author liuhj
* @created 2017年9月20日 下午1:45:55
* @version 1.0.0
*/
public interface SyncDataOperateService {
/**
* 根据操作类型进行同步方法操作接口
*
* @param id
* @param relationId
* @param tabName
* @param params
* @param operateType
* @return
*/
ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params, String operateType);
}
/**
* @Type SyncOperateServiceImpl
* @Desc 同步系统操作各业务系统数据服务
* @author liuhj
* @created 2017年9月20日 下午1:53:26
* @version 1.0.0
*/
@Component
public class SyncDataOperateServiceImpl implements SyncDataOperateService {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataOperateServiceImpl.class);
@Value("${dubbo.application.name}")
private static String application;
/**
* 配置系统服务
*/
@Reference
private MapperConfigService mapperConfigService;
@Override
public ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params,
String operateType) {
ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
try {
MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
......
String mapperClass = mapperConfigDTO.getMapperClass();
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......
int count = 0;
Long callBackId = null;
if ("I".equalsIgnoreCase(operateType)) {
count = mapper.insertSyncableData(params);
callBackId = (long) (int) params.get("id");
}
if ("U".equalsIgnoreCase(operateType)) {
params.put("id", relationId);
count = mapper.updateSyncableData(params);
callBackId = null;
}
if ("D".equalsIgnoreCase(operateType)) {
count = mapper.deleteSyncableDataById(relationId);
callBackId = null;
}
......
responseDTO.setData(String.valueOf(callBackId));
} catch (Exception e) {
LOGGER.error("SyncDataOperateServiceImpl.operateSyncableData ->加载mapper类失败", e);
responseDTO.setDataMessage(ExceptionEnum.ERROR.getCode(), "加载mapper类失败");
return responseDTO;
}
return responseDTO;
}
}
通过java API在应用启动时动态注册dubbo服务(目的是通过version将不同的服务区分开来):
/**
* 各业务系统注册不同的服务实现类
* @author wangshuai
* @version V1.0
* @since 2017-09-20 11:13
*/
@Component
public class DubboServiceRegister {
@Value("${dubbo.application.name}")
private String application;
@Value("${dubbo.registry.address}")
private String zkAddress;
@Value("${dubbo.protocol.port}")
private int dubboPort;
/**
* 服务实现
*/
@Resource
private SyncDataQueryService syncDataQueryService;
@Resource
private SyncDataOperateService syncDataOperateService;
@PostConstruct
public void regist() {
//当前应用配置
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName(application);
//连接注册中心配置
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(zkAddress);
//服务提供者协议配置
ProtocolConfig protocolConfig = new ProtocolConfig();
protocolConfig.setName("dubbo");
protocolConfig.setPort(dubboPort);
//服务提供者暴露服务配置
ServiceConfig<SyncDataQueryService> dataQueryServiceConfig = new ServiceConfig<>();
dataQueryServiceConfig.setApplication(applicationConfig);
dataQueryServiceConfig.setRegistry(registryConfig);
dataQueryServiceConfig.setProtocol(protocolConfig);
dataQueryServiceConfig.setInterface(SyncDataQueryService.class);
dataQueryServiceConfig.setRef(syncDataQueryService);
dataQueryServiceConfig.setVersion(application);
ServiceConfig<SyncDataOperateService> dataOperateServiceConfig = new ServiceConfig<>();
dataOperateServiceConfig.setApplication(applicationConfig);
dataOperateServiceConfig.setRegistry(registryConfig);
dataOperateServiceConfig.setProtocol(protocolConfig);
dataOperateServiceConfig.setInterface(SyncDataOperateService.class);
dataOperateServiceConfig.setRef(syncDataOperateService);
dataOperateServiceConfig.setVersion(application);
//暴露及注册服务
dataQueryServiceConfig.export();
dataOperateServiceConfig.export();
}
}
如此,在同步系统调用服务时,就能通过version区分到对应的服务。
被同步数据落地的实现
当要同步的数据以JSON的形式到达分站点(被同步站点)时,被API系统接入并通过dubbo将数据传给同步系统,同步系统再调用各站点的数据落地服务实现类,这里就有一个问题:数据如何落地到数据库中?如果直接动态拼接sql直接执行是一种比较简单的实现方式,但是我们这里使用的是mybatis,那么如何找到对应的mapper并动态执行插入、更新、删除的方法?
在这里我的设计是定义一个父接口:
public interface SyncableMapper {
Map findSyncableDataById(Long id);
int insertSyncableData(Map paramMap);
int updateSyncableData(Map paramMap);
int deleteSyncableDataById(Long id);
}
这个接口定义了对业务系统数据表进行同步的公共方法,然后我让需要同步的mapper接口继承这个公共接口,并在mapper-xml里定义sql,这样同步时直接操作父接口的引用(实现类从spring上下文中根据条件动态获取):
@Repository("dubboApplicationDAO")
public interface DubboApplicationMapper extends SyncableMapper {
......
}
获取对应mapper并操作的代码(SyncDataOperateServiceImpl 类中):
//从配置中获取mapper的类的全限定名
String mapperClass = mapperConfigDTO.getMapperClass();
//动态获取操作对应表的mapper
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......
if ("I".equalsIgnoreCase(operateType)) {
mapper.insertSyncableData(params);
}
if ("U".equalsIgnoreCase(operateType)) {
params.put("id", relationId);
count = mapper.updateSyncableData(params);
}
if ("D".equalsIgnoreCase(operateType)) {
count = mapper.deleteSyncableDataById(relationId);
}
那么自然的,有了上述规则后,采集同步数据时我们只记录继承自我们定义的父接口的mapper的操作数据(在执行采集切面时加入判断):
public void doCollect(JoinPoint point) {
Class mapperType = point.getSignature().getDeclaringType();
if(mapperType.equals(SyncableMapper.class)) {
//如果执行的是SyncableMapper接口定义的方法 则不执行切面逻辑(只对实际的业务操作进行拦截)
System.out.println("执行的是SyncableMapper接口定义的方法 不执行切面逻辑");
return;
}
Class[] classes = point.getSignature().getDeclaringType().getInterfaces();
boolean flag = Arrays.stream(classes).anyMatch((c) -> c.equals(SyncableMapper.class));
if(!flag) {
//如果被拦截的mapper没有实现SyncableMapper接口,则不对其进行同步操作
System.out.println("被拦截的mapper没有实现SyncableMapper接口,不对其进行同步操作");
return;
}
String fullMethodSignature = point.getSignature().getDeclaringTypeName() + "." + point.getSignature().getName();
String sqlType = mybatisSqlUtils.getSqlType(fullMethodSignature);
if("SELECT".equalsIgnoreCase(sqlType)) {
return;
}
......//
}
两个站点间数据的关联
假设主站点中的某张表的某一条数据, id为A, 将其同步至分站点对应表中后的id是B,那么我们应该在数据在分站点中插入时,记录A、B两个id之间的关联,方便在做同步更新或者删除操作时使用:
CREATE TABLE `cloud_sync_relation` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '序号',
`gmt_created` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`remark` varchar(255) NOT NULL COMMENT '备注',
`is_deleted` tinyint(2) NOT NULL COMMENT '是否删除 0否1是',
`from_id` bigint(20) NOT NULL COMMENT '关联方id',
`to_id` bigint(20) NOT NULL COMMENT '被关联方id',
`relate_site_code` varchar(50) NOT NULL COMMENT '关联站点code',
`relate_resource` varchar(50) NOT NULL COMMENT '关联表名',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='同步系统关联关系'
附:
动态获取dubbo服务对象帮助类:
/**
* 动态获取dubbo服务对象帮助类
* @author wangshuai
* @version V1.0
* @since 2017-09-15 16:34
*/
@Component
public class DubboUtil {
public static final String VERSION = "VERSION";
public static final String GROUP = "GROUP";
public static final String ZOOKEEPERURL = "ZOOKEEPERURL";
public static final String APPLICATION = "APPLICATION";
@Value("${dubbo.application.name}")
private String appName;
@Value("${dubbo.registry.address}")
private String zkUrl;
public <T> T getDubboService(Class<T> clazz, Map<String, String> paramMap){
ApplicationConfig applicationConfig = new ApplicationConfig();
String app = paramMap.get(APPLICATION);
if(app != null && !"".equals(app)) {
applicationConfig.setName(app);
} else {
applicationConfig.setName(appName);
}
RegistryConfig registryConfig = new RegistryConfig();
String zookeeperurl = paramMap.get(ZOOKEEPERURL);
if(zookeeperurl != null && !"".equals(zookeeperurl)) {
registryConfig.setAddress(zookeeperurl);
} else {
registryConfig.setAddress(zkUrl);
}
applicationConfig.setRegistry(registryConfig);
ReferenceConfig<T> rc = new ReferenceConfig<T>();
rc.setApplication(applicationConfig);
String version = paramMap.get(VERSION);
if(version != null && !"".equals(version)) {
rc.setVersion(version);
}
String group = paramMap.get(GROUP);
if(group != null && !"".equals(group)) {
rc.setGroup(group);
}
rc.setInterface(clazz);
//缓存ReferenceConfig对象,防止内存和连接泄露
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
return cache.get(rc);
}
}
网友评论