分布式锁要满足四个基本的特点:
1.互斥性
2.加锁和解锁的人必须一致
3.不能发生死锁
4.容错性。
redisson中,通过检查key是否存在来保证唯一性。
同时加锁的时候,加锁的根据redisson客户端uuid+线程id组生成客户端唯一uuid,写入到哈希表中。
每个锁的名称就是key,给每个key设置ttl。一旦过期key就会被删除,保证不会发生死锁。
redisson巧妙的利用redis执行脚本的时候原子执行的特点,将加锁和解锁的过程封装在lua脚本中,实现加锁的原子性。
加锁的脚本:
--[[
/*
*key number:1
*KEYS[1]: 锁名
*ARGV[1]: 锁的存活期
*ARGV[2]: 锁id
*返回值: 0 - ok, >0 - fail
*/
]]--
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间\
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
加锁lua脚本的过程如下:
1.PNG
解锁lua脚本:
--[[
/*
*key number:2
*KEYS[1]: lock_name
*KEYS[2]: pub_chan_name,would be: hd_dlock_chan:${lock_name}
*ARGV[1]: publish content. 0 - unlock
*ARGV[2]: 锁的存活期
*ARGV[3]: 锁id
*/
]]--
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
解锁lua脚本过程如下:
2.PNG
redisson还支持递归锁。
一般情况,redis的分布式锁客户端上锁失败后,常用的做法是本地循环等待锁的释放。JAVA和C++都可以通过future来实现阻塞式的等待。
C++下用future实现的接口如下:
#include <uuid/uuid.h>
#include <unistd.h>
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future
extern RedisProxy g_redis_proxy;
std::string get_sys_uuid()
{
uuid_t uu;
uuid_generate( uu );
char buf[33]={0};
for(int i=0;i<16;i++)
{
sprintf(buf+i*2,"%02x",uu[i]);
}
//printf("%s\n",buf);
return string(buf);
}
std::string g_sys_uuid=get_sys_uuid();
/*
*key number:1
*KEYS[1]: lock_name
*ARGV[1]: lock ttl
*ARGV[2]: key id
*return value: 0 - ok, >0 - fail
*/
const std::string lua_lock_cript="\
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间\
if (redis.call('exists', KEYS[1]) == 0) then\
redis.call('hset', KEYS[1], ARGV[2], 1);\
redis.call('pexpire', KEYS[1], ARGV[1]);\
return nil;\
end;\
\
-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间\
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\
redis.call('hincrby', KEYS[1], ARGV[2], 1);\
redis.call('pexpire', KEYS[1], ARGV[1]);\
return nil;\
end;\
\
-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间\
return redis.call('pttl', KEYS[1]);\
";
/*
*key number:2
*KEYS[1]: lock_name
*KEYS[2]: pub_chan_name,would be: hd_dlock_chan:${lock_name}
*ARGV[1]: publish content. 0 - unlock
*ARGV[2]: lock ttl
*ARGV[3]: key id
*/
const std::string lua_unlock_cript="\
-- 若锁不存在:则直接广播解锁消息,并返回1\
if (redis.call('exists', KEYS[1]) == 0) then\
redis.call('publish', KEYS[2], ARGV[1]);\
return 1;\
end;\
\
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁\
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\
return nil;\
end;\
\
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1\
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\
if (counter > 0) then\
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期\
redis.call('pexpire', KEYS[1], ARGV[2]);\
return 0;\
else\
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程\
redis.call('del', KEYS[1]);\
redis.call('publish', KEYS[2], ARGV[1]);\
return 1;\
end;\
";
const std::string HD_DLOCK_PUB_CHAN_PRE="hd_dlock_chan:";
int dlock_subscribe(std::string chan)
{
return g_redis_proxy.subscribe(chan);
}
int try_lock(std::string &lock_name, int ttl, int timeout)
{
if(timeout<=0||ttl<0) {return -1;}
uint64 howlong=0;
do{
uint64 t1=get_millisecond();
std::string key_id=g_sys_uuid+to_string(getpid());
int ret=g_redis_proxy.eval3(lua_lock_cript,1,lock_name,to_string(ttl),key_id);
if(ret==0)
{
return 0;
}
uint64 t2=get_millisecond();
howlong+=t2-t1;
if(howlong>=timeout) {return -1;}
std::chrono::milliseconds span(1000*(timeout-howlong));
std::string pub_chan=HD_DLOCK_PUB_CHAN_PRE+lock_name;
std::future<int> fut = std::async(dlock_subscribe, pub_chan);
// timeout, no one release lock
if(fut.wait_for(span) == std::future_status::timeout)
{
g_redis_proxy.unsubscribe(pub_chan);
return -1;
}
//else someone release lock
ret = fut.get(); //must be 0,because we send 0 only
uint64 t3=get_millisecond();
howlong+=t3-t2;
}while(1);
}
int unlock(std::string &lock_name,int ttl)
{
std::string key_id=g_sys_uuid+to_string(getpid());
std::string pub_chan=HD_DLOCK_PUB_CHAN_PRE+lock_name;
int ret=g_redis_proxy.eval5(lua_unlock_cript,2,lock_name,pub_chan,to_string(0),to_string(ttl),key_id);
return ret;
}
网友评论