/**
* 阻塞的分布式锁
* @param lockName
* @param acquireTimeout
* @param lockTimeout
* @param callback
*/
public static <T> T executeWithRedisLock(String lockName, long acquireTimeout, long lockTimeout, Callback<T> callback) {
String lockValue="";
try{
lockValue = acquireLock(lockName, acquireTimeout, lockTimeout);
if(lockValue == null) {
return callback.acquireLockTimeout();
}
return callback.execute();
}finally {
releaseLock(lockName, lockValue);
}
}
public interface Callback<T>{
/**
* execute biz when acquire lock success.
* @return
*/
T execute();
/**
* execute biz when acquire lock time out.
* @return
*/
T acquireLockTimeout();
}
/**
* 阻塞的分布式锁 pair with acquireLock
* @param lockName 锁名称,redisKey
* @param acquireTimeout 获取锁的超时时间,单位毫秒
* @param lockTimeout 锁超时时间,单位毫秒
* @return 得到锁的值,null为获取锁超时
*/
private static String acquireLock(String lockName, long acquireTimeout, long lockTimeout) {
Jedis conn = getJedisPool().getResource();
boolean broken = false;
try {
String identifier = System.nanoTime() + ""; //锁的值
String lockKey = DISTRIBUTE_LOCK_PREFIX + lockName; //锁的键
int lockExpire = (int) (lockTimeout / 1000); //锁的过期时间
long end = System.currentTimeMillis() + acquireTimeout; //尝试获取锁的时限
while (System.currentTimeMillis() < end) { //判断是否超过获取锁的时限
if (conn.setnx(lockKey, identifier) == 1) { //判断设置锁的值是否成功
conn.expire(lockKey, lockExpire); //设置锁的过期时间
return identifier; //返回锁的值
}
try {
Thread.sleep(20); //等待1秒后重新尝试设置锁的值
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
String value = conn.get(lockKey);
if(value != null) {
if(System.nanoTime() - Long.valueOf(value) > (lockTimeout + REDIS_LOCK_TOLERATION_TIMEOUT) * 1000000) {
conn.del(lockKey);
}
}
// 获取锁失败时返回null
return null;
}catch (JedisException e) {
broken = handleJedisException(e);
return null;
}finally {
closeResource(conn, broken);
}
}
/**
* 释放分布式锁, pair with acquireLock
* @param lockName 锁名称,redisKey
* @return 释放锁释是否成功
*/
private static void releaseLock(String lockName, String identifier) {
Jedis conn = null;
boolean broken = false;
try {
conn = getJedisPool().getResource();
String lockKey = DISTRIBUTE_LOCK_PREFIX + lockName;
if (identifier != null && identifier.equals(conn.get(lockKey))) {
conn.del(lockKey);
}
}catch (JedisException e) {
broken = handleJedisException(e);
}finally {
if(conn!=null) {
closeResource(conn, broken);
}
}
}
private static boolean handleJedisException(JedisException jedisException) {
if (jedisException instanceof JedisDataException) {
if ((jedisException.getMessage() == null) || (jedisException.getMessage().indexOf("READONLY") == -1)) {
return false;
}
}
return true;
}
private static void closeResource(Jedis jedis, boolean conectionBroken) {
if (conectionBroken) {
jedisPool.returnBrokenResource(jedis);
} else {
jedisPool.returnResource(jedis);
}
}
网友评论