在分布式系统中,最常见的问题就是如何保证数据一致性,本文主要解决最终一致性。项目A中表A进行新增和修改操作时,项目B中的表B也需要进行新增和修改,生产者为A,消费者为B。
在项目A中新建实体MsgConsumerMessage,在项目B中新建表MsgProducerMessage:
public class MsgConsumerMessage {
//主键ID,对应MsgProducerMessage的ID,不能设置为自增
private Long id;
//消息服务类型ID
private Integer serviceId;
//消息要同步的json数据
private String params;
//同步的时间
private Long consumTime;
}
public class MsgProducerMessage {
//主键ID
private Long id;
//消息服务类型ID
private Integer serviceId;
//消费者提供的API地址,用于push消息给消费者
private String node;
//要push的json数据
private String params;
//消息创建时间
private Long createTime;
//是否同步
private Boolean sync;
}
在项目A需要同步数据的方法中,新增一条MsgProducerMessage记录,使用httpclient向项目B发送请求,接收到项目B接口的返回值后,r若成功则将sync改为true,失败则不改。另在项目A中添加定时任务,循环遍历未同步的MsgProducerMessage记录,依次访问消费者的API地址。
项目B提供的api接口如下:
//同步加锁用MAP
public static final ConcurrentMap<String,byte[]> SYN_MSG_MAP = new ConcurrentHashMap<String, byte[]>();
@RequestMapping(value = "pushMessage",method = RequestMethod.POST,produces = "application/json")
@ResponseBody
public String pushMessage(@RequestParam String message){
try {
if (StringUtils.isBlank(message)){
throw new BusinessException("参数不正确");
}
log.info("pushMessage begin:" + message);
//设置同步锁
if(!SYN_MSG_MAP.containsKey(message)){
SYN_MSG_MAP.putIfAbsent(message, new byte[0]);
}
//设置同步代码块
synchronized (SYN_MSG_MAP.get(message)) {
//消费信息对象并加锁
MsgConsumerMessage msgConsumerMessage = msgConsumerMessageService.consumeMessage(message);
//事务执行完成打印日志
if(msgConsumerMessage != null){
log.info("pushMessage success:" + msgConsumerMessage.toString());
}else{
log.info("pushMessage alreadySuccess:" + message);
}
}
return JsonTemplate.createSimpleJson("result", "success");
} catch (Exception e){
log.error("pushMessage failure:" + message + ",error:" + e.getMessage());
return JsonTemplate.createSimpleJson("result", "failure");
} finally {
if(SYN_MSG_MAP.containsKey(message))
SYN_MSG_MAP.remove(message);
}
}
}
为防止多条重复消息发送并发访问此api接口,这里使用了ConcurrentHashMap做为同步锁进行同步,consumeMessage方法就是同步json数据的方法,这里就不再详细写明。只需注意在同步前查询一下是否已存在MsgConsumerMessage记录,不存在则开始同步,并给需要同步的数据实体加上更新锁。如果要保证数据更新的顺序性,可在所需更新的数据的表中添加updateTime字段,同步的时候进行时间判断。同步成功后添加一条MsgConsumerMessage记录,表明已同步,同时返回给项目A同步成功标志。
网友评论