美文网首页首页投稿(暂停使用,暂停投稿)程序员
30 带你利用zookeeper 分布式锁解决缓存重建冲突

30 带你利用zookeeper 分布式锁解决缓存重建冲突

作者: 逐暗者 | 来源:发表于2017-10-03 22:53 被阅读0次

    上一篇 分布式缓存重建并发冲突问题以及zookeeper分布式锁解决方案, 主要讲解了分布式缓存重建冲突原因及利用zookeeper分布式锁解决缓存重建冲突问题,本篇接着上篇,实现上篇思路,带你利用zookeeper代码实现分布式锁解决重建缓存冲突问题。

    缓存重建分析图

    从上图我们可以看出:

    • 缓存主动更新
      我们监听kafka中的缓存操作消息队列,当接收到一个商品变更消息后,我们会立即根据源数据服务获取商品最新信息,然后更新到ehcache 和 redis cluster 中,这种情况笔者将之称为缓存主动更新
    • 缓存被动重建
      当nginx 请求获取商品信息时,发现redis cluster 和 ehcache 中都没有获取到相关商品信息,这时候就需要到源数据服务中拉取商品信息,这时候我们需要同步更新到redis cluster 和 ehcache 中,然后返回nginx 并进行nginx 本地缓存,这种情况笔者将之称为缓存被动重建

    那么前面一章中,笔者分析了缓存重建情况

    这里笔者在着重讲下,当缓存数据由于个方面因素(如LRU等算法)清理了,这时候缓存主动更新 和 缓存在高并发或者特殊情况下,同时进行时,缓存重建冲突就悲剧的发生了(注:上篇说多个缓存服务实例时,出现分布式缓存重建冲突没错,但是就算不是多缓存实例服务,单个也会发生,只要两者同时发生即可,这里着重补充一下

    上篇讲了可以通过分布式锁方案解决

    zookeeper分布式锁的解决逻辑思路

    • 变更缓存重建或者空缓存请求重建,更新redis之前,先获取对应商品id的分布式锁
    • 拿到分布式锁后,做时间版本比较,如果自己的版本新于redis中的版本,那么就更新,否则就不更新
    • 如果拿不到分布式锁,那么就等待,不断轮询等待,直到自己获取到分布式的锁

    那下面我们就进入coding 环节,来吧!(注:以下代码层面只针对分布式锁,其他不做介绍,具体其他设计实现会单独剥离讲解

    首先,我们来写个zookeeper 工具类,提供 获取锁 acquireDistributedLock、释放锁releaseDistributedLock 方法

    • 添加zookeeper client 依赖(注:如果你添加了kafka 依赖,这里就不需要单独依赖了
    <dependency>
          <groupId>org.apache.zookeeper</groupId>
         <artifactId>zookeeper</artifactId>
         <version>3.4.5</version>
         <exclusions>
          <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
         </exclusion>
        </exclusions>
    </dependency>
    
    • 创建 zookeeperSession 工具类,代码如下,对应代码都有注释说明,故不做过多解释了
    /**
     * 
     * zookeeper 分布式锁工具类
     * @author bill
     * @date 2017年10月3日 上午11:39:08
     */
    public class ZookeeperSession {
        
        private ZooKeeper zookeeper;
        //计数器(同步锁),连接信号量,用于控制并发请求时,确保 zookeeper client 与 server 已连接
        private static CountDownLatch connectSemaphore = new CountDownLatch(1);
        
        private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperSession.class);
        
        public ZookeeperSession(){
            try {
                // 连接 zookeeper server
                this.zookeeper = new ZooKeeper("192.168.0.16:2181,192.168.0.17:2181,192.168.0.18:2181", 50000, new ZookeeperWatcher());
                // 等待,保证 client、server连接
                connectSemaphore.await();
                LOGGER.debug(" zookeeper session established ...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        /**
         *  获取分布式锁
         * @param productId 商品id
         */
        public void acquireDistributedLock(Long productId){
            String path = "/product-lock-" + productId;
            try {
                // 尝试获取分布式锁
                zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
                LOGGER.debug("success to acquire lock for productId [{}]", productId);
            } catch (Exception e) {
                // 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止
                int count = 0;
                while(true){
                    try {
                        // 睡眠一下下,为了测试效果,生产环境,可以20ms
                        Thread.sleep(1000);
                        // 再次尝试获取分布式锁
                        zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    } catch (Exception e2) {
                        // 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止
                        LOGGER.debug("repeat to acquire lock for productId:[{}] - count:[{}] ...", productId, count);
                        count ++;
                        continue;
                    }
                    LOGGER.debug("success to acquire lock for productId:[{}] after count:[{}] repeat 。。。", productId, count);
                    break;
                }
            }
        }
        
        /**
         * 释放分布式锁
         * @param productId 商品id
         */
        public void releaseDistributedLock(Long productId){
            String path = "/product-lock-" + productId;
            try {
                // 删除node,释放锁
                zookeeper.delete(path, -1);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        /**
         * 创建 zookeeper session watcher
         * @author bill
         * @date 2017年10月3日 下午12:05:21
         */
        private class ZookeeperWatcher implements Watcher{
    
            @Override
            public void process(WatchedEvent evt) {
                LOGGER.debug("receive zookeeper watched event: {}", evt.getState());
                if(KeeperState.SyncConnected == evt.getState()){
                    // client、server 已连接 是否等待信号量锁
                    connectSemaphore.countDown();
                }
            }
            
        }
        
        
        /**
         * 单例有很多种方式去实现,这里采取绝对线程安全的一种方式
         * 静态内部类的方式,去初始化单例
         */
        private static class Singleton {
            
            private static ZookeeperSession instance;
            
            static{
                instance = new ZookeeperSession();
            }
            
            public static ZookeeperSession getInstance(){
                return instance;
            }
        }
        
        /**
         * jvm 的机制去保证多线程并发安全
         * 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化
         */
        public static ZookeeperSession getIntance(){
            return Singleton.getInstance();
        }
    
        /**
         * 初始化单例 zookeeperSession
         */
        public static void init(){
            getIntance();
        }
    }
    

    好,工具类写完了呢,我们不要忘了,对它进行初始化,笔者这里就直接在 InitListener 监听器中初始化了,如下图:

    初始化zookeeper

    那,接下来要干什么呢,自然就是缓存主动更新 或者 缓存被动重建代码中,加入分布式锁啦,让多个请求串行执行,即可

    下面笔者先讲 缓存主动更新 中怎么做,既然我们是通过接收kafka 商品变更消息去更新缓存,那么对应的就是在消费kafka 消息的时候先获取分布式锁,得到锁后,对比时间版本,决定是否更新缓存

    缓存主动更新

    我们找呀找,终于找到了消费kafka 商品变更消息线程 KafkaMessageProcessor

    在更新之前先获取锁,得到锁后,先获取redis 中 数据跟 当前商品数据时间版本对比,当前数据比缓存数据更靠后(更新),则更新,相关代码如下:

            // 在数据写入redis 缓存之前,先获取 zookeeper 分布式锁,确保缓存重建冲突
            ZookeeperSession zkSession = ZookeeperSession.getIntance();
            zkSession.acquireDistributedLock(productId);
            
            // 获取到了锁
            // 先从redis 中获取当前最新数据
            ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productId);
            if(null != redisLastProductInfo){
                //比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redis
                try {
                    Date date = sdf.parse(productInfo.getModifiedTime());
                    Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());
                    if(date.before(redisLastProductInfoDate)){
                        LOGGER.debug("无需更新  > 现有数据 date:[{}] - before redis 最新版本  date:[{}]", date, redisLastProductInfoDate);
                        // 无需更新,直接返回
                        return;
                    }
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                LOGGER.debug("current product info date is after redis product info date , to update redis");
            }else{
                LOGGER.debug("product Info is null, to update redis");
            }
            
            /** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- start*/
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- end*/
            
            // 更新本地 ehcache 缓存
            cacheService.saveProductInfo2LocalCache(productInfo);
            LOGGER.debug("获取刚保存到本地缓存的商品信息:[{}]", cacheService.getProductInfoFromLocalCache(productId));
            
            // 更新redis 缓存
            cacheService.saveProductInfo2RedisCache(productInfo);
            // 释放 zookeeper 分布式锁
            zkSession.releaseDistributedLock(productId);
    

    好了,缓存主动更新就完了,其实就这么简单,你懂了没有?

    接下来继续,缓存被动重建那就是从http 入手了,请求进来后,先到redis cluster中获取商品数据,发现没有,然后又到本地ehcache 中获取,发现也没有,这时候就到源数据服务中拉取mysql 商品数据,这时候是不是要更新到redis cluster 以及 ehcache 中呢,那是必须的,所以这里就有发生缓存重建冲突的可能。

    缓存被动重建

    注:这里的核心实现和缓存主动更新差不多,但是处理流程稍微有点不一样

    • 创建一个缓存重建队列,提供加入队列、获取队列数据方法
    • 创建一个缓存重建队列消费线程,设置商品数据缓存,同时做缓存重建冲突处理
    • 请求进来,如果缓存中都没有商品数据,到源数据服务拉取商品数据,然后将商品数据加入缓存重建队列,同时响应http 商品数据

    下面直接 coding 吧!

    • 创建一个缓存重建队列 RebuildCacheQueue,这是一个单例类,代码如下:
    /**
     * 
     * 重建缓存的内存队列
     * @author bill
     * @date 2017年10月3日 上午11:39:48
     */
    public class RebuildCacheQueue {
        
        /**
         * 内存队列
         */
        private ArrayBlockingQueue<ProductInfo> queue = new ArrayBlockingQueue<ProductInfo>(1000);
        
        
        /**
         * 将商品信息对象加入队列
         * @param productInfo 商品信息对象
         */
        public void putProductInfo(ProductInfo productInfo){
            try {
                queue.put(productInfo);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        /**
         *  从队列中获取商品信息对象
         * @return 商品信息对象
         */
        public ProductInfo takeProductInfo(){
            try {
                return queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
        
        /**
         * 单例有很多种方式去实现,这里采取绝对线程安全的一种方式
         * 静态内部类的方式,去初始化单例
         */
        private static class Singleton {
            
            private static RebuildCacheQueue instance;
            
            static {
                instance = new RebuildCacheQueue();
            }
            
            private static RebuildCacheQueue getInstance(){
                return instance;
            }
        }
        
        /**
         * jvm 的机制去保证多线程并发安全
         * 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化
         */
        public static RebuildCacheQueue getInstance(){
            return Singleton.getInstance();
        }
    }
    
    • 创建一个缓存重建队列消费线程 RebuilCacheThread,进行重建缓存队列消费,没什么好说的了,上面流程已经讲了很清楚了,直接看代码吧:
    /**
     * 
     * 重建缓存队列消费线程
     * @author bill
     * @date 2017年10月3日 上午11:52:40
     */
    public class RebuilCacheThread implements Runnable{
        
        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProcessor.class);
        
        private static final SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
    
        @Override
        public void run() {
            // 获取重建缓存队列实例
            RebuildCacheQueue rebuildCacheQueue = RebuildCacheQueue.getInstance(); 
            // 获取zookeeperSession 实例
            ZookeeperSession zkSession = ZookeeperSession.getIntance();
            CacheService cacheService = SpringContext.applicationContext.getBean(CacheService.class);
            while(true){
                ProductInfo productInfo = rebuildCacheQueue.takeProductInfo();
                // 获取zookeeper分布式锁
                zkSession.acquireDistributedLock(productInfo.getId());
                // 获取到了锁
                // 先从redis 中获取当前最新数据
                ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productInfo.getId());
                if(null != redisLastProductInfo){
                    //比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redis
                    try {
                        Date date = sdf.parse(productInfo.getModifiedTime());
                        Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());
                        if(date.before(redisLastProductInfoDate)){
                            LOGGER.debug("无需更新  > 现有数据 date:[{}] - before redis 最新版本  date:[{}]", date, redisLastProductInfoDate);
                            // 无需更新,直接返回
                            continue;
                        }
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
                    LOGGER.debug("current product info date is after redis product info date , to update redis");
                }else{
                    LOGGER.debug("product Info is null, to update redis");
                }
                // 更新本地 ehcache 缓存
                cacheService.saveProductInfo2LocalCache(productInfo);
                // redis 缓存
                cacheService.saveProductInfo2RedisCache(productInfo);
                // 释放 zookeeper 分布式锁
                zkSession.releaseDistributedLock(productInfo.getId());
            }
        }
    }
    
    • http 请求时,无缓存数据,重建缓存,加入重建缓存队列
        /**
         * 获取商品信息
         * @param productId 商品id
         * @return 商品信息
         */
        @GetMapping("/getProductInfo")
        public ProductInfo getProductInfo(Long productId){
            ProductInfo productInfo = null;
            try {
                productInfo = cacheService.getProductInfoFromRedisCache(productId);
                LOGGER.debug("从redis中获取缓存,商品信息: {}", productInfo); 
                if(productInfo == null){
                    productInfo = cacheService.getProductInfoFromLocalCache(productId);
                    LOGGER.debug("从ehcache中获取缓存,商品信息: {}", productInfo);
                }
                if(productInfo == null){
                    //走 数据源重新拉数据并重建缓存,注意这里笔者就直接写死数据了
                    String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:01\"}";
                    productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
                    // 将数据推送到一个内存队列中消费(重建缓存的内存队列)
                    RebuildCacheQueue.getInstance().putProductInfo(productInfo);
                }
            } catch (Exception e) {}
            return productInfo;
        }
    

    好了,缓存主动更新 和 缓存被动重建三部曲就完了,可以很清楚的看到,代码量不多,主要集中在更新缓存前获取分布式锁即可,结合缓存重建分析图

    接下来要干啥呢,有个很重要的环节没有做,那就是测试,不到黄河不死心,看不到效果,我也不信,那我们就拿出来遛遛吧

    代码测试

    测试数据

    缓存主动更新

    kafka 生产数据:

     {"serviceId":"productInfoService","productId":10}
    

    从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:00):

    String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:00\"}";
    
    缓存被动重建

    http 请求:

    http://localhost:81/getProductInfo?productId=10
    

    从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:01):

    String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:01\"}";
    

    准备环境:
    启动 redis cluster
    启动 zookeeper 集群
    启动 kafka 集群
    启动 缓存服务(缓存项目服务)

    注:由于笔者使用Windows 系统,可能避免windows 中 centos ip与主机名映射不了,所以作以下配置,推荐使用 SwitchHosts 的工具,很方便

    C:\Windows\System32\drivers\etc\hosts
        *   ################################### 配置本地hosts #################### 很重要 ######################
        *   # 缓存架构方案
        *       192.168.0.16 my-cache1
        *       192.168.0.17 my-cache2
        *       192.168.0.18 my-cache3
        *   ################################### 配置本地hosts #################### 很重要 ######################
    
    利用kafka-console-producer.sh 生产一条商品变更消息,并回车(由于代码中更新时为了演示效果,休眠了10s,把握时间)

    cd /usr/local/kafka && bin/kafka-console-producer.sh --broker-list my-cache1:9092,my-cache2:9092,my-cache3:9092 --topic cache-message

    商品消息如下:

    {"serviceId":"productInfoService","productId":10}

    浏览器 http 请求

    http://localhost:81/getProductInfo?productId=10

    测试效果

    期望效果,先看到控制台打印kafka 的消费日志,等kafka 消费线程释放分布式锁后,才能看到缓存被动重建 获取分布式锁,并更新redis,并且返回modifiedTime:2017-10-3 12:30:01 的商品信息,同时redis cluster 中的数据也必须是 modifiedTime:2017-10-3 12:30:01 的商品信息,表示测试通过,这里都说得我有点迫不及待了,来吧,试试

    主动更新缓存 浏览器请求 打印日志 redis cluster 最新商品数据

    好了,是不是很激动呀,今天的讲解就到这里哈,赶紧去试试吧!

    注:这里只是以商品信息为例来讲解利用分布式锁解决缓存重建冲突,其他如商铺信息等也是同理,这里希望告诉你的是方法,就不一一演示了。

    以上就是本章内容,如有不对的地方,请多多指教,谢谢!

    为了方便有需要的人,本系列全部软件都在 https://pan.baidu.com/s/1qYsJZfY

    下章预告:主要讲解 简单看 storm

    代码地址附上:https://github.com/bill5/cache-project/tree/master/cache-cache

    作者:逐暗者 (转载请注明出处)

    相关文章

      网友评论

        本文标题:30 带你利用zookeeper 分布式锁解决缓存重建冲突

        本文链接:https://www.haomeiwen.com/subject/qlvxyxtx.html