美文网首页
聊聊多个节点实例数据同步如何触发

聊聊多个节点实例数据同步如何触发

作者: linyb极客之路 | 来源:发表于2023-10-23 10:01 被阅读0次

    前言

    之前写过一篇文章聊聊在集群环境中本地缓存如何进行同步,今天聊的话题看着和那篇文章有点雷同,不过我们今天重点会放在方法论上,也不会拘泥于具体实现。在聊这个话题之前,大家可以思考一下,如果要实现多个实例数据同步触发,大家会怎么做?脑海里,是会浮现,我可以用消息队列或者定时器来实现?这种已经具象化的技术细节?还是进一步进行拆解?

    假设大家已经思考好,我来说下我个人的思考逻辑。今天标题的内容,主要讲同步如何触发?内容已经圈定死,因此就不谈数据同步涉及的一致性,只谈如何触发这个动作。多节点实例触发的关键是,一旦触发,各个节点都要通知到位。那如何进行多个节点通知呢?答案就是通过广播。那如何感知是否通知到位呢?这个还真不好搞,那我们换个思路,如果通知不到位,我们的措施会是啥?正常我们的思路,会是通过补偿机制

    今天我们聚焦在广播这个动作,补偿机制暂不在本文讨论。下面通过一个案例实操下

    本案例核心流程图

    703eca7227f4beb57f0e6052321c9b2e_4dc98ddc5a9209dc2bbeeaecde33dc4b.png

    从图中,我们会发现本案例是通过一个中间件来实现。那这个中间件是啥?是rocketmq、kafka还是其他具有广播功能的组件或者服务?答案是也不是。怎么说?我们这个中间件,其实是一层高层广播抽象,而非具体实现

    实现步骤

    1、定义高层广播抽象接口

    @FunctionalInterface
    public interface DataSyncTrigger {
    
        void broadcast(Object data);
    }
    
    

    2、定义通知事件类

    注: 本文会采用spring的事件监听模式实现

    public class DataSyncTriggerEvent extends ApplicationEvent {
        /**
         * Create a new ApplicationEvent.
         *
         * @param source the object on which the event initially occurred (never {@code null})
         */
        public DataSyncTriggerEvent(Object source) {
            super(source);
        }
    }
    
    

    3、定义高层抽象广播的模板基类

    @RequiredArgsConstructor
    public abstract class BaseDataSyncTrigger implements DataSyncTrigger, ApplicationContextAware {
        protected ApplicationContext applicationContext;
        
        protected final DataSyncTriggerProperty dataSyncTriggerProperty;
    
    
        @Override
        public void broadcast(Object data) {
            DataSyncTriggerEvent dataSyncTriggerEvent = new DataSyncTriggerEvent(data);
            applicationContext.publishEvent(dataSyncTriggerEvent);
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        private Collection<DataSyncTriggerCallBack> listDataSyncTriggerCallBacks(){
            try {
                Map<String, DataSyncTriggerCallBack> dataSyncTriggerCallBackMap = applicationContext.getBeansOfType(DataSyncTriggerCallBack.class);
                return Collections.unmodifiableList(dataSyncTriggerCallBackMap.values().stream().collect(Collectors.toList()));
            } catch (BeansException e) {
    
            }
    
            return Collections.emptyList();
        }
        
        public void callBack(Object data){
            Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks = listDataSyncTriggerCallBacks();
            if(CollectionUtil.isNotEmpty(dataSyncTriggerCallBacks)){
                if(dataSyncTriggerProperty.isTriggerCallBackAsync()){
                    callbackAsync(data, dataSyncTriggerCallBacks);
                }else{
                    callbackSync(data, dataSyncTriggerCallBacks);
                }
              
            }
        }
    
        private  void callbackSync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {
            for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {
                dataSyncTriggerCallBack.execute(data);
            }
        }
    
        private  void callbackAsync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {
            for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {
                ThreadUtil.execAsync(()->{
                    dataSyncTriggerCallBack.execute(data);
                });
            }
        }
    }
    
    

    4、定义抽象回调接口【扩展点】

    当业务收到通知,可以通过该回调接口进行具体业务操作

    @FunctionalInterface
    public interface DataSyncTriggerCallBack {
    
        void execute(Object data);
    }
    

    5、定义具体广播实现类

    注: 这个广播的具体实现方案就很多了,只要天生具备广播能力或者基于原来特性扩展出广播的组件都可以,比如rocketmq的广播机制、redis的pubsub机制、zookeeper的分布式协调能力、基于注册中心服务发现能力改造出来的广播能力等。本文就以redis的pubsub机制为例

    Slf4j
    public class RedisDataSyncTrigger extends BaseDataSyncTrigger implements CommandLineRunner {
    
    
        private final RedisTemplate redisTemplate;
        
    
        public RedisDataSyncTrigger(RedisTemplate redisTemplate, DataSyncTriggerProperty dataSyncTriggerProperty) {
            super(dataSyncTriggerProperty);
            this.redisTemplate = redisTemplate;
        }
    
        @EventListener
        public void listener(DataSyncTriggerEvent dataSyncTriggerEvent){
            SyncDataDTO syncDataDTO = SyncDataDTO.builder()
                    .data(dataSyncTriggerEvent.getSource())
                            .timeStamp(System.currentTimeMillis())
                                    .build();
            try {
                redisTemplate.convertAndSend(REDIS_CHANNEL_KEY, syncDataDTO);
            } catch (Exception e) {
               log.error("redis publish channel 【" + REDIS_CHANNEL_KEY + "】 fail,cause:" + e.getMessage(),e);
            }
        }
    
    
        @Override
        public void run(String... args) throws Exception {
            doSubscribe();
        }
    
        @SneakyThrows
        private void doSubscribe() {
            RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
            RedisMessageListener redisMessageListener = applicationContext.getBean(RedisMessageListener.class);
            connection.subscribe(redisMessageListener,REDIS_CHANNEL_KEY.getBytes("utf-8"));
            log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Register listen channel : 【{}】",REDIS_CHANNEL_KEY);
        }
    }
    
    

    具体redis订阅监听实现

    @RequiredArgsConstructor
    @Slf4j
    public class RedisMessageListener implements MessageListener{
    
        private final BaseDataSyncTrigger baseDataSyncTrigger;
    
        private final RedisTemplate redisTemplate;
    
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            byte[] body = message.getBody();
            String dataJson = StrUtil.str(body, "utf-8");
            if(JSONUtil.isJson(dataJson)){
                try {
                    SyncDataDTO dataDTO = (SyncDataDTO) redisTemplate.getHashValueSerializer().deserialize(body);
                    baseDataSyncTrigger.callBack(dataDTO.getData());
                } catch (Exception e) {
                    log.error(e.getMessage(),e);
                }
            }else{
                log.warn(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data 【{}】 is not match json format !!!",dataJson);
            }
    
        }
    }
    
    

    6、测试验证

    a、编写业务逻辑类

    @Service
    @RequiredArgsConstructor
    @Slf4j
    public class DataService {
    
        private List<Object> dataList = new CopyOnWriteArrayList<>();
    
        private final RedisTemplate redisTemplate;
    
        private final BaseDataSyncTrigger dataSyncTrigger;
    
        public boolean add(String data){
            try {
                Long count = redisTemplate.opsForList().leftPush(RedisConstant.REDIS_LIST_KEY, data);
                if(count > 0){
                    dataSyncTrigger.broadcast(data);
                    return true;
                }
            } catch (Exception e) {
               log.error("add fail:" + e.getMessage(),e);
            }
    
            return false;
    
        }
    
        public List<Object> getDataList(){
            return dataList;
        }
    }
    

    b、编写业务控制器

    @RestController
    @RequestMapping("data")
    @RequiredArgsConstructor
    public class DataController {
        
    
    
        private final DataService dataService;
    
        @GetMapping("add/{data}")
        public String syncData(@PathVariable("data") String data){
            boolean isSuccess = dataService.add(data);
            return isSuccess ? "success" : "fail";
        }
    
        @GetMapping("list")
        public List<Object> listData(){
            return dataService.getDataList();
        }
    }
    
    

    c、编写业务回调类

    @Component
    @RequiredArgsConstructor
    @Slf4j
    public class LocalListDataSyncTriggerCallBack implements DataSyncTriggerCallBack {
    
        private final DataService dataService;
    
        @Override
        public void execute(Object data) {
            dataService.getDataList().add(data);
            log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sync data:-->{}",data);
        }
    }
    

    d、小细节

    注: 当项目重启时,本地存储容器是没内容的,因此需要在项目重启时,写一个钩子,从其他缓存介质将数据刷到本地存储中

    @Component
    @RequiredArgsConstructor
    @Slf4j
    public class DataInitTask implements CommandLineRunner {
    
        private final RedisTemplate redisTemplate;
    
        private final DataService dataService;
    
    
        @Override
        public void run(String... args) throws Exception {
            List redisDataList = redisTemplate.opsForList().range(RedisConstant.REDIS_LIST_KEY, 0, -1);
            if(CollectionUtil.isNotEmpty(redisDataList)){
                dataService.getDataList().addAll(redisDataList);
                log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Loaded data from redis finished!!!");
            }
    
        }
    
    
    }
    
    

    e、测试

    从一个节点(示例:54860端口)添加数据,如图

    5783665f1a3eef84b3d9eabf1f73c856_3658dff9dddb0888e61e6158274518dc.png

    观察其他节点(示例:59829端口)本地存储是否接收到数据

    391f57e9aea4f9e7435fd38b1925f1e4_54e76766e4aaacdf8e58feff6e326cb5.png

    从图可以发现已经收到数据,同时我们观察控制台


    1a3e0654bc8b83edd298171005098697_09d9e70be00a18a2924e634ce61b382e.png

    可以看出业务回调已经触发

    总结

    本文介绍了通过redis pubsub实现广播效果,示例代码中也提供基于注册中心以及配置中心apollo来实现广播的效果。基于篇幅就不再论述了,感兴趣的朋友,可以查看下方demo链接。本文除了介绍多个节点实例数据同步如何触发之外,其实还有实现一个通用组件套路原则--依赖倒置原则。高层定义抽象,程序依赖高层抽象,也不依赖具体实现,这样后续才比较好扩展

    demo链接

    https://github.com/lyb-geek/springboot-learning/tree/master/springboot-localdata-sync

    相关文章

      网友评论

          本文标题:聊聊多个节点实例数据同步如何触发

          本文链接:https://www.haomeiwen.com/subject/pvadidtx.html