概述
目前几乎很多应用都是分布式部署的,我们的应用中有一些业务需要定时执行,虽然有幂等性保护,但是不想让一次任务被调度多次(打印太多错误日志,数据库主键约束,耗费资源)。所以选择使用基于Redis的分布式锁
Redis分布式锁特性
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
业务实现
刚开始选择 Redis 中的 SETNX 来获取锁。
Set key to hold string value if key does not exist.
In that case, it is equal to SET.
When key already holds a value, no operation is performed.
SETNX is short for "SET if Not eXists".
Return value
Integer reply, specifically:
1 if the key was set
0 if the key was not set
看代码:
public void startTask() {
//先判断是否有节点执行该任务
try (Jedis jedis = jedisSentinelPool.getResource()) {
long ttl = jedis.ttl(TASK_KEY);
LogWriter.getAsyncLog().info("ttl:" + ttl);
//1
if (ttl == -1) {
jedis.del(TASK_KEY);
}
//2
long flag = jedis.setnx(TASK_KEY, "true");
LogWriter.getAsyncLog().info("redis setnx flag: " + flag);
if (flag == 1) {
//3
jedis.expire(TASK_KEY, TASK_KEY_EXPIRE);
LogWriter.getAsyncLog().info("### start a new schedule task.");
}
} catch (Exception e) {
e.printStackTrace();
LogWriter.getErrorLog().error(e.getMessage());
}
}
我们在获取redis锁的时候会使用 setnx 和 expire 两条命令来实现的, 这不是个原子操作. 所以就会导致这过程中出很多问题.
其中一点就是当我们执行上述代码第3步的时候,由于网络波动或者别的异常原因,导致锁的TTL设置失败。因此我们加了第一步:判断锁的TTL值。
我们虽然对上面逻辑进行了判断,但是都是无用功,因为那些判断都不能保证这两条命令的原子性。
比如:节点1执行完第2步,节点2开始执行任务;这时候节点2执行第1步获取锁的TTL就会是-1,并执行if内的程序。最终执行第2步、第3步。此时节点1、节点2都会该定时任务的逻辑,违背了我们业务的初衷。
节点1:
2018-08-14 15:00:00,003 INFO [DefaultQuartzScheduler_Worker-7] UploadScheduleTask.startTask(41)|ttl:-2
2018-08-14 15:00:00,004 INFO [DefaultQuartzScheduler_Worker-7] UploadScheduleTask.startTask(48)|redis setnx flag: 1
2018-08-14 15:00:00,005 INFO [DefaultQuartzScheduler_Worker-7] UploadScheduleTask.startTask(52)|### start a new upload schedule task , now is 1534230000004
节点2:
2018-08-14 15:00:00,003 INFO [DefaultQuartzScheduler_Worker-7] UploadScheduleTask.startTask(41)|ttl:-1
2018-08-14 15:00:00,004 INFO [DefaultQuartzScheduler_Worker-7] UploadScheduleTask.startTask(48)|redis setnx flag: 1
2018-08-14 15:00:00,005 INFO [DefaultQuartzScheduler_Worker-7] UploadScheduleTask.startTask(52)|### start a new upload schedule task , now is 1534230000005
重新翻看SETNX的官方文档:
Design pattern: Locking with SETNX
Please note that:
The following pattern is discouraged in favor of the Redlock algorithm which is only a bit more complex to implement, but offers better guarantees and is fault tolerant.
We document the old pattern anyway because certain existing implementations link to this page as a reference. Moreover it is an interesting example of how Redis commands can be used in order to mount programming primitives.
Anyway even assuming a single-instance locking primitive, starting with 2.6.12 it is possible to create a much simpler locking primitive, equivalent to the one discussed here, using the SET command to acquire the lock, and a simple Lua script to release the lock. The pattern is documented in the SET command page.
这里说明从2.6.12版本后, 就可以使用set来获取锁, Lua 脚本来释放锁
然后我们看下 set命令的说明, 发现这里面可以有nx,xx等参数, 来实现 setnx 的功能.
SET key value [expiration EX seconds|PX milliseconds] [NX|XX]
Options
Starting with Redis 2.6.12 SET supports a set of options that modify its behavior:
EX seconds -- Set the specified expire time, in seconds.
PX milliseconds -- Set the specified expire time, in milliseconds.
NX -- Only set the key if it does not already exist.
XX -- Only set the key if it already exist.
Note: Since the SET command options can replace SETNX, SETEX, PSETEX, it is possible that in future versions of Redis these three commands will be deprecated and finally removed.
这样就可以保证了原子性。
修改后的逻辑代码:
try (Jedis jedis = jedisSentinelPool.getResource()) {
/**
* 前期使用SETNX 作分布式锁。
* 但是该命令 设置key,value 与 expire 分为两步。
* 官方建议在redis 2.6.12 后,
* 使用 SET key value [expiration EX seconds|PX milliseconds] [NX|XX] 来代替 SETNX。
*/
String flag = jedis.set(TASK_KEY, "true", "nx", "ex", TASK_KEY_EXPIRE);
if ("OK".equals(flag)) {
jedis.expire(TASK_KEY, TASK_KEY_EXPIRE);
LogWriter.getAsyncLog().info("### start a new schedule task.");
}
/**
* 后期如果需要在任务执行完毕 or 执行过程中出现异常 需要删除key 时 建议执行以下代码。
* 此代码是redis官方推荐使用Lua脚本删除key.
* String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
* Long result = (Long) jedis.eval(script, Collections.singletonList(TASK_KEY), Collections.singletonList("true"));
*/
} catch (Exception e) {
e.printStackTrace();
LogWriter.getErrorLog().error(e.getMessage());
}
}
网友评论