美文网首页
微服务动态属性配置

微服务动态属性配置

作者: shiguangfeixu | 来源:发表于2019-06-08 21:58 被阅读0次

1 需求介绍

使用Spring Boot开发微服务的过程中难免会遇到配置问题,常见的配置方式为:配置文件设置、数据库动态配置以及数据库加缓存进行配置等。

1.1 不同配置方式的优缺点分析

  • 配置文件配置:通过读取properties中的配置,将代码中用到的属性抽取出来放入配置文件,代码中通过@Value注解读取属性的值,这样方便对配置进行维护和管理,有助于提高代码的可读性和可维护性。但是@Value在Spring初始化的时候对值进行读取,一旦服务创建好之后不能修改,如果想修改某一个配置,尤其是变化较为频繁的字段,需要重启整个服务,不方便。


  • 读取数据库获得动态配置: 动态的读取数据库中的字段是一种非常有效的动态配置方式,后台只需要将相关的属性存储在数据库中,当数据库中的值修改之后,代码中会及时获取到新的属性值,从而完成动态配置。但是这种配置方式会频繁的读取数据库,如果每次查询操作都会用到数据库中的某个配置,会造成数据库热点问题,影响性能。


  • 读取数据库+缓存获得动态配置:如果既要保证配置的动态性又要考虑性能,可以通过缓存的方式将硬盘中数据库中的属性值放在cache中(30分支定时从数据库中读取一次),从而减少磁盘的读取操作同时也提高了读取效率,但是对缓存的要求较高,如果某个配置需要保证实时性(紧急情况下对服务降级的配置),及时数据库中的值以及更新,但是缓存还没有更新,不能够保证数据一致性。需要在可用性和一致性中寻求一个平衡。


1.2 配置方式的改进

通过分析上述三种配置方式的优缺点,我们希望引入一个统一的配置中心,能够对不同分支,不同服务,不同代码进行动态配置,同时也要保证时效性和可用性。这里在数据库+缓存的基础上,引入消息中间件,如果管理员对代码配置进行修改,能够通过消息及时通知到服务,然后刷新缓存,获取修改后的消息(不过如果配置频繁修改,该方法还需要改进,但是可以保证缓存与数据库在修改之后的一致性)。


2 框架介绍

这里通过配置中心接口对数据库中的值进行修改,同时其他服务通过数据库+缓存的方式从数据库中读取配置,配置修改之后如何在不同服务之间进行通知,这里采用消息中间件进行通知,或者采用RPC的方式更快,其他服务Service1-3通过订阅不同配置的消息,如果有对数据库中配置的修改操作,会对订阅消息的服务发送请求,重新刷新缓存数据,旧的缓存失效,起到即时通知的作用。


3 代码介绍

3.1 config模块

承担配置中心的作用,用于数据库中配置的存储以及Kafka(这里采用kafka作为消息中间件)消息的通知。

  • 3.1.1配置存储
@Service
@Slf4j
public class ConfigServiceImpl implements ConfigService {

    @Autowired
    private ConfigRepo configRepo;

    @Autowired
    private KafkaSender sender;

    @Override
    public Boolean save(Config config) {
        if(config.getKey()==null||config.getValue()==null){
            log.error("the config is not enabled!");
            return false;
        }
        try{
            Optional<Config> configDB = configRepo.findByKey(config.getKey());
            if(configDB.isPresent()){
                Config configTemp = configDB.get();
                configTemp.setValue(config.getValue());
                configRepo.save(configTemp);
            }else{
                configRepo.save(config);
            }
            sender.send(config.getKey(), config.getValue());
            return true;
        }catch (Exception e){
            return false;
        }
    }

    @Override
    public String query(String key) {
        if(key==null){
            log.error("the key is null!");
            return null;
        }
        Optional<Config> config = configRepo.findByKey(key);
        if(config.isPresent()){
            return config.get().getValue();
        }
        return null;
    }

}
  • 3.1.2 消息发送
@Component
@Slf4j
@PropertySource("classpath:config/kafka.properties")
public class KafkaSender {

   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;

   private Gson gson = new GsonBuilder().create();

   @Value("${kafka.topic}")
   private String topic;

   public void send(String key, String value) {
       Message message = new Message();
       message.setId(System.currentTimeMillis());
       message.setKey(key);
       message.setValue(value);
       message.setSendTime(new Date().toString());
       log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));
       kafkaTemplate.send(topic, key, gson.toJson(message));
   }
}

3.2 service模块

简单实现了一个基本的配置查询操作(这里使用Caffeine缓存),以及Kafka消息订阅。

  • 3.2.1 添加缓存读取数据库中的值
@Service
@Slf4j
public class ConfigServiceImpl implements ConfigService {

    @Autowired
    private ConfigRepo configRepo;

    @Autowired
    @Qualifier(DEVICE_CACHE)
    private Caffeine<Object, Object> caffeineBuilder;

    private LoadingCache<String, String> cache;

    @PostConstruct
    public void initCache() {
        cache = caffeineBuilder.build(new CacheLoader<String, String>() {
            @Override
            public String load(String key) throws Exception {
                log.info("fail to hit cache for key={}, try to find it via RPC.", key);
                String value = queryValueFromDB(key);
                if (value == null) {
                    log.info("fail to hit cache for key={}, the value is null.", key);
                    return null;
                }
                return value;
            }

            @Override
            public String reload(String key, String oldValue)throws Exception{
                return load(key);
            }
        });
    }

    public String queryValueFromDB(String key){
        Optional<Config> config = configRepo.findByKey(key);
        if(config.isPresent()){
            return config.get().getValue();
        }
        return null;
    }

    /**
     * 手动刷新缓存
     * @param key
     */
    public void invalidateCache(String key){
        cache.refresh(key);
    }

    @Override
    public String query(String key) {
        if(key==null){
            log.error("the key is null!");
            return null;
        }
        String config = cache.get(key);
        return config;
    }
}
  • 3.2.2 kafka消息订阅
@Component
@Slf4j
@PropertySource("classpath:config/kafka.properties")
public class Listener {

    @Value("${kafka.topic}")
    private String topic;

    @Autowired
    private ConfigServiceImpl service;

    @KafkaListener(groupId = "group0", topics = "shgx")
    public void listener(ConsumerRecord<?, ?> cr) throws Exception {
        Message message = JSONObject.parseObject((String) cr.value(), Message.class);
        String key = (String) cr.key();
        String value = message.getValue();
        String oldValue = service.query(key);
        if(!oldValue.equals(value)){
            //原来的值已经更新,重新从数据库中读取缓存
            service.invalidateCache((String) cr.key());
        }
        log.info("+++++++++++++++++++++  topic = {}, key = {}, value = {}.", cr.topic(), cr.key(), cr.value());
    }
}

4 效果分析

4.1 数据插入:

http://localhost:8081/config/manage/save

发送JSON数据

{
  "key":"hello",
  "value":"world",
  "branch":"dev",
  "version":"v1.0",
  "date":"2019-06-08"
}

数据库存储值,并通过kafka发送消息:


4.2 数据查询

http://localhost:8080/config/manage/search/hello

多次点击查询,但是只有一条SQL查询语句,数据以及保存在了缓存中。


4.3 配置更新

这时修改hello的值为world!!!



service模块收到消息通知并更新了缓存:




5 完整源码

github参考

相关文章

网友评论

      本文标题:微服务动态属性配置

      本文链接:https://www.haomeiwen.com/subject/sqlwxctx.html