分布式锁原理
- 创建一个 zk 临时 node,来模拟一个商品 id 加锁。
- zk 会保证一个 node path 只会被创建一次,如果已经被创建,则抛出 NodeExistsException。
- 这个时候可以去做业务操作。
- 释放锁,则是删除这个临时 node。
当一个多个缓存服务去对同一个商品 id 加锁时,只有一个成功, 其他的则轮循等待锁被释放,获取到锁之后,对比一下商品的时间版本,较新则重建缓存,否则放弃重建。
业务功能
1、主动更新
监听kafka消息队列,获取到一个商品变更的消息之后,去源服务中调用接口拉取数据,更新到ehcache和redis中。先获取分布式锁,然后才能更新redis,同时更新时要比较时间版本。
// 加锁
ZooKeeperSession zks = ZooKeeperSession.getInstance();
zks.acquireDistributedLock(productId);
try {
// 先获取一次 redis ,防止其他实例已经放入数据了
ProductInfo existedProduct = cacheService.getProductInfoOfReidsCache(productId);
if (existedProduct != null) {
// 判定通过消息获取到的数据版本和 redis 中的谁最新
Date existedModifyTime = existedProduct.getModifyTime();
Date modifyTime = productInfo.getModifyTime();
// 如果本次获取到的修改时间大于 redis 中的,那么说明此数据是最新的,可以放入 redis 中
if (modifyTime.after(existedModifyTime)) {
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("最新数据覆盖 redis 中的数据:" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
}
} else {
// redis 中没有数据,直接放入
cacheService.saveProductInfo2LocalCache(productInfo);
log.info("获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));
cacheService.saveProductInfo2ReidsCache(productInfo);
}
} finally {
// 最后释放锁
zks.releaseDistributedLock(productId);
}
2、被动重建
直接读取源头数据,直接返回给nginx,同时推送一条消息到一个队列,后台线程异步消费。
后台现成负责先获取分布式锁,然后才能更新redis,同时要比较时间版本。
测试
1.首先先删除 redis 中测试的数据:
redis-cli -h 192.168.201.33 -p 7001 -c
192.168.201.33:7001> get product_info_1
-> Redirected to slot [9029] located at 192.168.201.34:7003
"{\"color\":\"\xe7\xba\xa2\xe8\x89\xb2,\xe7\x99\xbd\xe8\x89\xb2,\xe9\xbb\x91\xe8\x89\xb2\",\"id\":1,\"name\":\"iphone7\xe6\x89\x8b\xe6\x9c\xba\",\"pictureList\":\"a.jpg,b.jpg\",\"price\":5599,\"service\":\"iphone7\xe7\x9a\x84\xe5\x94\xae\xe5\x90\x8e\xe6\x9c\x8d\xe5\x8a\xa1\",\"shopId\":1,\"size\":\"5.5\",\"specification\":\"iphone7\xe7\x9a\x84\xe8\xa7\x84\xe6\xa0\xbc\"}"
192.168.201.34:7003> del product_info_1
(integer) 1
192.168.201.34:7003> get product_info_1
(nil)
2.模拟向kafka发送主动更新的消息:
cd /usr/local/kafka
bin/kafka-console-producer.sh --broker-list 192.168.201.33:9092,192.168.201.34:9092,192.168.201.35:9092 --topic cache-message
{"serviceId":"productInfoService","productId":"4"}
//kafka消息队列主动更新
=================从redis中获取缓存,商品信息=null
=================从ehcache中获取缓存,商品信息=null
- 前端访问更新:
http://localhost:8080/getProductInfo?productId=4](http://localhost:8080/getProductInfo?productId=4
//Thread
the 1 times try to acquire lock for product[id=4]......
the 2 times try to acquire lock for product[id=4]......
the 3 times try to acquire lock for product[id=4]......
//kafka主动更新并释放锁
===================获取刚保存到本地缓存的商品信息:ProductInfo [id=4, name=iphone7手机, price=5599.0, pictureList=a.jpg,b.jpg, specification=iphone7的规格, service=iphone7的售后服务, color=红色,白色,黑色, size=5.5, shopId=1, modifiedTime=2020-09-24 12:00:00]
release the lock for product[id=4]......
//Thread获取所并比较时间后发现是最新数据,然后更新
success to acquire lock for product[id=4] after 3 times try......
current date[2017-01-01 12:01:00] is before existed date[2020-09-24 12:00:00]
//最后也没展示最新数据
{"id":4,"name":"iphone7手机","price":5599.0,"pictureList":"a.jpg,b.jpg","specification":"iphone7的规格","service":"iphone7的售后服务","color":"红色,白色,黑色","size":"5.5","shopId":1,"modifiedTime":"2017-01-01 12:01:00"}
网友评论