美文网首页
sentinel限流二开(2)—可插拔的分布式存储

sentinel限流二开(2)—可插拔的分布式存储

作者: 小胖学编程 | 来源:发表于2021-08-20 09:33 被阅读0次

    目前使用的是consul作为存储中心,但是如何“无痛”切换到mongodb、nacos、redis、mysql等存储中心呢?本文带你去了解。

    1. 使用“组合”+“接口”将不同存储中心“统一化”

    不同的物理存储中心,例如consul、nacos、redis、mongo都是no-sql存储,即存储k-v形式的数据。甚至mysql中也可以存储json格式的数据。虽然有各种各样形式的client实现。但若是想嵌入到咱sentinel作为存储中心,必须满足咱定义的“接口”规范。通过“组合”的形式将存储中心实际client嵌入到接口子类中,对外提供服务。

    1.1 “规范”的存储中心操作类

    接口如下:

    /**
     * 组合的方式来设置类
     */
    public interface RulesOperation<T> {
    
    
        void put(String key, String val);
    
        String get(String key);
    
        void removeMap(String key);
    
        /**
         * 通过"appRules"获取到项目名。
         * 当然,若使用redis等存储中心,返回结果可以读取某个key里面的定值,无需像consul一样实时计算~
         *
         * @param name 为appRules。
         * @return 持久化的配置名
         */
        Set<String> getAppNames(String name);
    
    
        default void putKeyStrMap(String key, ConcurrentMap<String, T> map) {
            String val;
            try {
                val = JacksonUtil.objectMapper().writeValueAsString(map);
            } catch (IOException e) {
                throw new RuntimeException("序列化失败", e);
            }
            put(key, val);
        }
    
    
        default ConcurrentMap<String, T> getKeyStrMap(String key) {
            ConcurrentMap<String, T> map = new ConcurrentHashMap<>();
            String value = get(key);
            if (value != null) {
                try {
                    map = JacksonUtil.objectMapper().
                            readValue(value, new TypeReference<ConcurrentHashMap<String, T>>() {
                            });
                } catch (IOException e) {
                    throw new RuntimeException("反序列化失败", e);
                }
            }
            return map;
        }
    
    
        default void putMap(String key, ConcurrentMap<Long, T> map) {
            String val;
            try {
                val = JacksonUtil.objectMapper().writeValueAsString(map);
            } catch (IOException e) {
                throw new RuntimeException("序列化失败", e);
            }
            put(key, val);
        }
    
    
        default ConcurrentMap<Long, T> getMap(String key) {
            ConcurrentMap<Long, T> map = new ConcurrentHashMap<>();
            String value = get(key);
            if (value != null) {
                try {
                    map = JacksonUtil.objectMapper().
                            readValue(value, new TypeReference<ConcurrentHashMap<Long, T>>() {
                            });
                } catch (IOException e) {
                    throw new RuntimeException("反序列化失败", e);
                }
            }
            return map;
        }
    }
    

    没错,只需要简单的实现put、get、remove等方法,就可以无痛的替换存储中心。

    1.2 consul分布式存储中心

    /**
     * 实际的操作类——将实际的操作类进行解耦
     *
     */
    public class ConsulRulesOperation<T> implements RulesOperation<T> {
    
        private ConsulClient consulClient;
    
        private final Logger logger = LoggerFactory.getLogger(ForgeSimpleMachineDiscovery.class);
    
    
        public ConsulRulesOperation(ConsulClient consulClient) {
            this.consulClient = consulClient;
        }
    
        @Override
        public void put(String key, String val) {
            consulClient.setKVValue(genKey(key), val);
        }
    
        @Override
        public String get(String key) {
            GetValue res = consulClient.getKVValue(genKey(key)).getValue();
            String ans = null;
            if (res != null) {
                ans = res.getDecodedValue();
            }
            return ans;
        }
    
        public void removeMap(String key) {
            consulClient.deleteKVValue(genKey(key));
        }
    
        /**
         * 通过前缀获取项目名
         */
        @Override
        public Set<String> getAppNames(String prefix) {
            Response<List<String>> appRules = consulClient.getKVKeysOnly(genKey(prefix));
            List<String> results = appRules.getValue();
            //规则的截取-appRules/项目名/规则名
            Set<String> consulSet = new HashSet<>();
            if (results != null) {
                for (String result : results) {
                    try {
                        String[] arr = result.split("/");
                        consulSet.add(arr[2]);
                    } catch (Exception e) {
                        logger.error("", e);
                    }
                }
            }
            return consulSet;
        }
    
        /**
         * 生产存储的key
         * @param name
         * @return
         */
        public String genKey(String name) {
            return String.join("/", "sentinel", name);
        }
    
    }
    

    1.3 动态切换存储中心的配置

    1.3.1 config配置类

    配置文件中修改:sentinel.dashboard.store属性完成动态切换。

    @EnableConfigurationProperties(RepositoryProperties.class)
    @Configuration
    public class RepositoryConfig {
        private final Logger logger = LoggerFactory.getLogger(RepositoryConfig.class);
    
    
        /**
         * 伪造心跳的bean
         */
        @Bean
        @ConditionalOnMissingBean
        public ForgeSimpleMachineDiscovery forgeSimpleMachineDiscovery(RulesOperation RulesOperation) {
            logger.info("启动分布式机器注册[{}]", RulesOperation.getClass().getSimpleName());
            return new ForgeSimpleMachineDiscovery(RulesOperation);
        }
    
        /**
         * 分布式规则存储中心
         */
        @Bean
        @ConditionalOnMissingBean
        public <T> RulesStoreManager<T> appRulesStoreManager(RulesOperation<T> rulesOperation, Lock lock) {
            logger.info("服务维度规则的存储中心注册[{}]", rulesOperation.getClass().getSimpleName());
            return new RulesStoreManager<>(rulesOperation, lock);
        }
    
        @Bean
        @ConditionalOnMissingBean
        @ConditionalOnProperty(name = "sentinel.dashboard.lock", havingValue = "memory", matchIfMissing = true)
        public InMemoryLock inMemoryLock() {
            logger.info("内存锁启动...");
            return new InMemoryLock();
        }
    
        /**
         * 初始化consul的配置信息
         */
        @Configuration
        @ConditionalOnClass(ConsulClient.class)
        @ConditionalOnProperty(name = "sentinel.dashboard.store", havingValue = "consul")
        public static class ConsulClientConfig {
    
            private final Logger logger = LoggerFactory.getLogger(ConsulClientConfig.class);
    
            /**
             * 真实的consul客户端
             */
            @Bean
            @ConditionalOnMissingBean
            public ConsulClient consulClient(RepositoryProperties repositoryProperties) {
                logger.info("consul的客户端启动...");
                return new ConsulClient(repositoryProperties.getUrl(), repositoryProperties.getPort());
            }
    
            /**
             * 分布式无论存储的操作客户端
             */
            @Bean
            @ConditionalOnMissingBean
            public <T> RulesOperation<T> rulesOperation(ConsulClient consulClient) {
                logger.info("consul的规则操作启动...");
                return new ConsulRulesOperation<>(consulClient);
            }
    
            /**
             * 加载分布式锁
             */
            @Bean
            @ConditionalOnMissingBean
            @ConditionalOnProperty(name = "sentinel.dashboard.lock", havingValue = "consul")
            public ConsulLock consulLock(ConsulClient consulClient) {
                logger.info("consul分布式锁启动...");
                return new ConsulLock(consulClient);
            }
    
        }
    }
    
    

    1.3.2 properties配置类

    @ConfigurationProperties("sentinel.dashboard")
    public class RepositoryProperties {
    
        /**
         * 存储url
         */
        private String url;
    
        /**
         * 端口号
         */
        private int port;
    
        /**
         * 存储中心类型
         */
        private StoreEnum store;
    
        /**
         * 锁类型
         */
        private LockEnum lock;
    
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public StoreEnum getStore() {
            return store;
        }
    
        public void setStore(StoreEnum store) {
            this.store = store;
        }
    }
    

    1.3.3 配置文件

    # 存储中心
    sentinel.dashboard.url=127.0.0.1
    sentinel.dashboard.port=8500
    # 分布式存储中心的存储介质
    sentinel.dashboard.store=consul
    # 分布式存储中心读取数据锁的类型
    sentinel.dashboard.lock=consul
    

    2. 分布式存储如何在使用泛型的情况下操作map?

    为什么操作map,因为sentinel单机版就大量使用map作为存储中心,为了少修改代码,存储中心的value应该是map类型。

    当value是map类型时,就出现了一个问题:

    1. 操作不扁平:可以将consul等分布式存储中心看着一个大map,那么单机版中的map实际是consul的value值,consul操作value必须:
      1. 将JSON在consul取到内存,并反序列为map;
      2. 修改内存中的map;
      3. 将内存中的map序列化为JSON,并存到consul;
    2. 泛型操作:操作的value的值实际上是泛型类型,根据子类得到不同的泛型对象
    3. 线程不安全:单机版使用ConcurrentHashMap.computeIfAbsent在线程安全的情况下获取到value,并通过值传递的形式来修改。

    2.1 实现扁平化操作

    接口关系如下所示:实现如下接口满足map的扁平化操作。

    image.pngimage.png

    2.2 如何实现泛型操作

    重点观察泛型和接口的关系,比较复杂。

    存储实体的基本操作

    /**
     * 存储实体的管理器
     */
    public interface StoreManager<K, V> {
    
        void put(K key, V value);
    
        /**
         * 取出数据
         */
        V get(K key);
    
        V remove(K key);
    }
    
    

    map扁平化接口:

    public interface ExtendMapOperation<K, F, V> {
    
        void putField(K key, F field, V value);
    
        V getField(K key, F field);
    
        V removeField(K key, F field);
    }
    

    规则的分布式存储实现类——可以存储allRules规则以及appRules规则:

    public class RulesStoreManager<T> implements StoreManager<String, ConcurrentMap<Long, T>>, ExtendMapOperation<String, Long, T> {
    
    
        private RulesOperation<T> rulesOperation;
    
        private Lock lock;
    
        public RulesStoreManager(RulesOperation<T> rulesOperation, Lock lock) {
            this.rulesOperation = rulesOperation;
            this.lock = lock;
        }
    
        /**
         * @param key   appName
         * @param value Map<Long,T>对象
         */
        @Override
        public void put(String key, ConcurrentMap<Long, T> value) {
            //生成组装的key: 一个服务下的所有配置
            rulesOperation.putMap(key, value);
        }
    
        @Override
        public ConcurrentMap<Long, T> get(String appName) {
            return rulesOperation.getMap(appName);
        }
    
    
        @Override
        public ConcurrentMap<Long, T> remove(String key) {
            ConcurrentMap<Long, T> map = get(key);
            rulesOperation.removeMap(key);
            return map;
        }
    
    
        @Override
        public void putField(String appName, Long field, T value) {
            lock.threadSafetyFunction(appName, 10, () -> {
                ConcurrentMap<Long, T> map = get(appName);
                map.put(field, value);
                put(appName, map);
                return null;
            });
    
    
        }
    
        @Override
        public T getField(String appName, Long field) {
            Map<Long, T> map = get(appName);
            return map.get(field);
        }
    
        @Override
        public T removeField(String appName, Long field) {
    
            return lock.threadSafetyFunction(appName, 10, () -> {
                ConcurrentMap<Long, T> map = get(appName);
                T remove = map.remove(field);
                put(appName, map);
                return remove;
            });
        }
    
    }
    

    2.2.1 泛型操作遇到的坑—反序列化无法得到泛型对象

    对应FastJson来说,在反序列中传入type是可以得到泛型对象的。

    但实际上得到的却是JsonObject对象。无法转换为RuleEntity对象。

    使用Jackson也遇见了此种情况:

    map = JacksonUtil.objectMapper().
           readValue(value, new TypeReference<ConcurrentHashMap<String, T>>() {
     });
    

    2.2.2 解决方案

    自定义Jackson的ObjectMapper对象,在序列化的时候将属性的类型也序列化即可:

    public class JacksonUtil {
    
        private static ObjectMapper objectMapper = new ObjectMapper();
    
        static {
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        }
    
        public static ObjectMapper objectMapper() {
            return objectMapper;
        }
    
    }
    

    2.3 如何实现线程安全

    使用的锁机制,实现了consul分布式锁以及默认的内存锁。保证“扁平化”操作时的线程安全性。

    /**
     * 锁机制
     *
     */
    public interface Lock {
    
        String lock(String lockName, int ttlSeconds);
    
        void unLock(String sessionID);
    
        /**
         * 线程安全的操作
         *
         * @param lockName   锁名称
         * @param ttlSeconds 最大超时时间
         * @param supplier   待保护的临界资源
         * @param <T>        返回值的类型
         * @return 临界资源返回值
         */
        default <T> T threadSafetyFunction(String lockName, int ttlSeconds, Supplier<T> supplier) {
            T result;
            String successId = lock(lockName, 10);
            if (successId != null) {
                try {
                    result = supplier.get();
                } finally {
                    unLock(successId);
                }
            } else {
                throw new RuntimeException("冲突啦,请稍等处理~");
            }
            return result;
    
        }
    
    }
    

    2.3.1 (默认)内存锁的实现

    借助ConcurrentHashMap将ReentrantLock存储起来。
    ——“这么骚的操作是在seata源码中借鉴的。”

    /**
     * 内存级别的锁,默认提供
     *
     */
    public class InMemoryLock implements Lock {
    
        /**
         * map维护Object的Monitor对象锁
         */
        private Map<String, ReentrantLock> lockMap = new ConcurrentHashMap<>(64);
    
        @Override
        public String lock(String lockName, int ttlSeconds) {
            //获取key对应的内存锁
            ReentrantLock reentrantLock = lockMap.computeIfAbsent(lockName, (k) -> new ReentrantLock());
            try {
                if (reentrantLock.tryLock(ttlSeconds, TimeUnit.SECONDS)) {
                    return lockName;
                }
            } catch (InterruptedException e) {
            }
            return null;
        }
    
        @Override
        public void unLock(String sessionID) {
            if (sessionID == null) {
                return;
            }
            //获取对应的锁
            ReentrantLock reentrantLock = lockMap.get(sessionID);
            //释放锁
            reentrantLock.unlock();
        }
    }
    

    2.3.2 (重点)consul实现分布式锁

    /**
     * consul分布式锁机制
     *
     */
    public class ConsulLock implements Lock {
    
        private ConsulClient consulClient;
    
        /**
         * 构造函数
         *
         * @param consulHost 注册consul的client或服务端的Ip或主机名,或域名
         * @param consulPort 端口号
         */
        public ConsulLock(String consulHost, int consulPort) {
            consulClient = new ConsulClient(consulHost, consulPort);
        }
    
    
        public ConsulLock(ConsulClient consulClient) {
            this.consulClient = consulClient;
        }
    
        /**
         * 获得锁的方法
         *
         * @param lockName   竞争的资源名
         * @param ttlSeconds 锁的超时时间,超过该时间自动释放【超时时间,最低10秒】
         * @return 若为null表示加锁失败,若非null,表示加锁成功
         */
        public String lock(String lockName, int ttlSeconds) {
            if (ttlSeconds < 10 || ttlSeconds > 86400) ttlSeconds = 10;
            String sessionId = createSession(lockName, ttlSeconds);
            //个性化处理
            boolean success = lock("sentinel/lock/" + lockName, sessionId);
            if (!success) {
                consulClient.sessionDestroy(sessionId, null);
                return null;
            }
            return sessionId;
        }
    
        public void unLock(String sessionID) {
            consulClient.sessionDestroy(sessionID, null);
        }
    
        private String createSession(String lockName, int ttlSeconds) {
            NewCheck check = new NewCheck();
            check.setId("check " + lockName);
            check.setName(check.getId());
            check.setTtl(ttlSeconds + "s"); //该值和session ttl共同决定决定锁定时长
            check.setTimeout("10s");
            consulClient.agentCheckRegister(check);
            consulClient.agentCheckPass(check.getId());
    
            NewSession session = new NewSession();
            session.setBehavior(Session.Behavior.RELEASE);
            session.setName("session " + lockName);
            session.setLockDelay(1);
            session.setTtl(ttlSeconds + "s"); //和check ttl共同决定锁时长
            List<String> checks = new ArrayList<>();
            checks.add(check.getId());
            session.setChecks(checks);
            return consulClient.sessionCreate(session, null).getValue();
        }
    
    
        private boolean lock(String lockName, String sessionId) {
            PutParams putParams = new PutParams();
            putParams.setAcquireSession(sessionId);
            return consulClient.setKVValue(lockName, "lock:" + LocalDateTime.now(), putParams).getValue();
        }
    
    }
    

    2.3.3 按照配置切换锁

    在1.3小节的“动态切换”中,可以根据配置文件来动态切换锁。当然以后去除consul使用redis等存储中心,可以实现定义的Lock接口,实现Redis的分布式锁。

    相关文章

      网友评论

          本文标题:sentinel限流二开(2)—可插拔的分布式存储

          本文链接:https://www.haomeiwen.com/subject/qiozbltx.html