美文网首页
一个简单的分布式系统数据同步解决方案

一个简单的分布式系统数据同步解决方案

作者: 雨中独奏 | 来源:发表于2018-06-29 23:08 被阅读0次

在分布式系统中,最常见的问题就是如何保证数据一致性,本文主要解决最终一致性。项目A中表A进行新增和修改操作时,项目B中的表B也需要进行新增和修改,生产者为A,消费者为B。
在项目A中新建实体MsgConsumerMessage,在项目B中新建表MsgProducerMessage:

public class MsgConsumerMessage {
    //主键ID,对应MsgProducerMessage的ID,不能设置为自增
    private Long id;
    //消息服务类型ID
    private Integer serviceId;
    //消息要同步的json数据
    private String params;
    //同步的时间
    private Long consumTime;
}
public class MsgProducerMessage {
    //主键ID
    private Long id;
    //消息服务类型ID
    private Integer serviceId;
    //消费者提供的API地址,用于push消息给消费者
    private String node;
    //要push的json数据
    private String params;
    //消息创建时间
    private Long createTime;
    //是否同步
    private Boolean sync;
}

在项目A需要同步数据的方法中,新增一条MsgProducerMessage记录,使用httpclient向项目B发送请求,接收到项目B接口的返回值后,r若成功则将sync改为true,失败则不改。另在项目A中添加定时任务,循环遍历未同步的MsgProducerMessage记录,依次访问消费者的API地址。
项目B提供的api接口如下:

//同步加锁用MAP
    public static final ConcurrentMap<String,byte[]> SYN_MSG_MAP = new ConcurrentHashMap<String, byte[]>();

    @RequestMapping(value = "pushMessage",method = RequestMethod.POST,produces = "application/json")
    @ResponseBody
    public String pushMessage(@RequestParam String message){
        try {
            if (StringUtils.isBlank(message)){
                throw new BusinessException("参数不正确");
            }
            log.info("pushMessage begin:" + message);
            //设置同步锁
            if(!SYN_MSG_MAP.containsKey(message)){
                SYN_MSG_MAP.putIfAbsent(message, new byte[0]);
            }
            //设置同步代码块
            synchronized (SYN_MSG_MAP.get(message)) {
                //消费信息对象并加锁
                MsgConsumerMessage msgConsumerMessage = msgConsumerMessageService.consumeMessage(message);
                //事务执行完成打印日志
                if(msgConsumerMessage != null){
                    log.info("pushMessage success:" + msgConsumerMessage.toString());
                }else{
                    log.info("pushMessage alreadySuccess:" + message);
                }
            }
           return JsonTemplate.createSimpleJson("result", "success");
        } catch (Exception e){
            log.error("pushMessage failure:" + message + ",error:" + e.getMessage());
            return JsonTemplate.createSimpleJson("result", "failure");
        } finally {
            if(SYN_MSG_MAP.containsKey(message))
                SYN_MSG_MAP.remove(message);
        }

    }
}

为防止多条重复消息发送并发访问此api接口,这里使用了ConcurrentHashMap做为同步锁进行同步,consumeMessage方法就是同步json数据的方法,这里就不再详细写明。只需注意在同步前查询一下是否已存在MsgConsumerMessage记录,不存在则开始同步,并给需要同步的数据实体加上更新锁。如果要保证数据更新的顺序性,可在所需更新的数据的表中添加updateTime字段,同步的时候进行时间判断。同步成功后添加一条MsgConsumerMessage记录,表明已同步,同时返回给项目A同步成功标志。

相关文章

网友评论

      本文标题:一个简单的分布式系统数据同步解决方案

      本文链接:https://www.haomeiwen.com/subject/ibhzyftx.html