美文网首页
zookeeper分布式锁解决方案

zookeeper分布式锁解决方案

作者: sknfie | 来源:发表于2020-09-25 18:37 被阅读0次

分布式锁原理

  1. 创建一个 zk 临时 node,来模拟一个商品 id 加锁。
  2. zk 会保证一个 node path 只会被创建一次,如果已经被创建,则抛出 NodeExistsException。
  3. 这个时候可以去做业务操作。
  4. 释放锁,则是删除这个临时 node。
    当一个多个缓存服务去对同一个商品 id 加锁时,只有一个成功, 其他的则轮循等待锁被释放,获取到锁之后,对比一下商品的时间版本,较新则重建缓存,否则放弃重建。

业务功能

1、主动更新
监听kafka消息队列,获取到一个商品变更的消息之后,去源服务中调用接口拉取数据,更新到ehcache和redis中。先获取分布式锁,然后才能更新redis,同时更新时要比较时间版本。

    // 加锁
    ZooKeeperSession zks = ZooKeeperSession.getInstance();
    zks.acquireDistributedLock(productId);
    try {
        // 先获取一次 redis ,防止其他实例已经放入数据了
        ProductInfo existedProduct = cacheService.getProductInfoOfReidsCache(productId);
        if (existedProduct != null) {
            // 判定通过消息获取到的数据版本和 redis 中的谁最新
            Date existedModifyTime = existedProduct.getModifyTime();
            Date modifyTime = productInfo.getModifyTime();
            // 如果本次获取到的修改时间大于 redis 中的,那么说明此数据是最新的,可以放入 redis 中
            if (modifyTime.after(existedModifyTime)) {
                cacheService.saveProductInfo2LocalCache(productInfo);
                log.info("最新数据覆盖 redis 中的数据:" + cacheService.getProductInfoFromLocalCache(productId));
                cacheService.saveProductInfo2ReidsCache(productInfo);
            }
        } else {
            // redis 中没有数据,直接放入
            cacheService.saveProductInfo2LocalCache(productInfo);
            log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
            cacheService.saveProductInfo2ReidsCache(productInfo);
        }
    } finally {
        // 最后释放锁
        zks.releaseDistributedLock(productId);
    }

2、被动重建
直接读取源头数据,直接返回给nginx,同时推送一条消息到一个队列,后台线程异步消费。
后台现成负责先获取分布式锁,然后才能更新redis,同时要比较时间版本。

测试

1.首先先删除 redis 中测试的数据:

redis-cli -h 192.168.201.33 -p 7001 -c
192.168.201.33:7001> get product_info_1
-> Redirected to slot [9029] located at 192.168.201.34:7003
"{\"color\":\"\xe7\xba\xa2\xe8\x89\xb2,\xe7\x99\xbd\xe8\x89\xb2,\xe9\xbb\x91\xe8\x89\xb2\",\"id\":1,\"name\":\"iphone7\xe6\x89\x8b\xe6\x9c\xba\",\"pictureList\":\"a.jpg,b.jpg\",\"price\":5599,\"service\":\"iphone7\xe7\x9a\x84\xe5\x94\xae\xe5\x90\x8e\xe6\x9c\x8d\xe5\x8a\xa1\",\"shopId\":1,\"size\":\"5.5\",\"specification\":\"iphone7\xe7\x9a\x84\xe8\xa7\x84\xe6\xa0\xbc\"}"

192.168.201.34:7003> del product_info_1
(integer) 1
192.168.201.34:7003> get product_info_1
(nil)

2.模拟向kafka发送主动更新的消息:

cd /usr/local/kafka
bin/kafka-console-producer.sh --broker-list 192.168.201.33:9092,192.168.201.34:9092,192.168.201.35:9092 --topic cache-message
{"serviceId":"productInfoService","productId":"4"}
//kafka消息队列主动更新
=================从redis中获取缓存,商品信息=null
=================从ehcache中获取缓存,商品信息=null
  1. 前端访问更新:
http://localhost:8080/getProductInfo?productId=4](http://localhost:8080/getProductInfo?productId=4

//Thread
the 1 times try to acquire lock for product[id=4]......
the 2 times try to acquire lock for product[id=4]......
the 3 times try to acquire lock for product[id=4]......

//kafka主动更新并释放锁
===================获取刚保存到本地缓存的商品信息:ProductInfo [id=4, name=iphone7手机, price=5599.0, pictureList=a.jpg,b.jpg, specification=iphone7的规格, service=iphone7的售后服务, color=红色,白色,黑色, size=5.5, shopId=1, modifiedTime=2020-09-24 12:00:00]
release the lock for product[id=4]......

//Thread获取所并比较时间后发现是最新数据,然后更新
success to acquire lock for product[id=4] after 3 times try......
current date[2017-01-01 12:01:00] is before existed date[2020-09-24 12:00:00]

//最后也没展示最新数据
{"id":4,"name":"iphone7手机","price":5599.0,"pictureList":"a.jpg,b.jpg","specification":"iphone7的规格","service":"iphone7的售后服务","color":"红色,白色,黑色","size":"5.5","shopId":1,"modifiedTime":"2017-01-01 12:01:00"}

相关文章

网友评论

      本文标题:zookeeper分布式锁解决方案

      本文链接:https://www.haomeiwen.com/subject/wepfuktx.html