最近在做业务的过程中,PM大大提了一个需求--程序每隔十分钟运行一次统计修改的数据,这对我们程序员来说就是一个定时任务。根据需求很快把业务逻辑实现了,接下来就是怎么运行这段程序的问题。由于现在的程序都是集群部署的模式,很少有单个实例运行。我们希望在某一个时刻只有一个实例来运行这段定时任务,而不希望所有的实例都运行,造成所谓的“惊群现象”,造成不必要的资源浪费。这时就需要一个分布式锁,来保证实例之间的同步。
基于redis的分布式锁的实现
由于redis的命令是原子性的,所以获取锁的命令,如果返回OK,就代表获取锁成功,然后可以执行脚本,否则就代表获取锁失败。记住一定要给锁加上过期时间,要不然,假如一个线程获取锁之后,这个线程挂了,然后这个锁就会一直存在,其他线程就没有办法获取锁。
set my-lock my-value NX EX 10*60
解锁的过程稍微麻烦些:我们需要先比对锁的值是不是当前线程所加的,然后才能释放。要不然会造成,有可能释放了其他线程加的锁。举个栗子,比如某一个线程加锁成功,然后设置了锁的过期时间为30s,然而在执行业务逻辑的时候花费了40s,再去释放锁的时候,这个锁已经不存在了,存在的锁有可能是另外一个线程加的。如果直接释放锁,就会释放了其他线程所加的锁。当然这也要求我们锁的值,必须是一个随机具有唯一性的值。
释放锁相当于两步操作,先比较值,然后再删除锁,为了保证原子性,我们需要用lua脚本来实现:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
上面的加锁解锁过程是基于redis单节点的锁。一般我们为了redis的高可用性,都会把redis部署成主从结构。在主从节点上,如果redis master节点宕机了,那么切换到slave上,但由于redis的主从复制是一步的,这可能在failover的过程中丧失锁的安全性。举个栗子:
- 客户端1从Master获取了锁。
- Master宕机了,存储锁的key还没有来得及同步到Slave上。
- Slave升级为Master。
- 客户端2从新的Master获取到了对应同一个资源的锁
- 客户1和客户端2同时持有了同一个资源的锁。
考虑到这种情况,redis的作者提出了一种基于多个节点锁的算法--redlock,有兴趣的可以去关注一下,但是这个算法在社区也有人提出了不同的看法,可以在网上找一下作者和其他人员的论道。这个算法各个语言都有自己的实现,主要步骤为:
- 获取当前时间
- 依次向N个Redis节点执行获取锁的操作
- 计算整个获取缩锁的过程总共消耗了多长时间,计算方法是用当前时间减去第一步记录的时间。如果获得了大多数节点(N/2+1)并且总共消耗的时间没有超过所得有效时间,那么这时任务客户端才认为最终获锁成功,否则,认为最终获取锁失败。
- 如果最终获取获取锁成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第三部计算出来的获取锁消耗的时间。
- 如果最终获取锁失败了,那么客户端应该立即向所有的Redis节点发起释放锁的操作。
基于etcd的分布式锁实现
etcd是一个用于配置共享和服务发现的键值存储,受到Zookeeper与doozer启发而催生的项目,etcd能够提供强一致性的保证。etcd每次修改键值存储的操作被分配有一个唯一的增加的修订版本,事务中的操作只分配一个修订版本;还有根据键的前缀能够模拟文件夹的层级,利用这些特性我们可以根据是不是当前目录下最小的节点来决定是不是获取到锁。etcd v3客户端驱动concurrency包中分装了一个分部式锁,其中获取锁的部分源码是
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
get := v3.OpGet(m.myKey)
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
etcdv3 client提供了Lock 和Unlock,能够提供一个分布式锁。
最后,考虑到业务和现有架构,获取锁成功就执行业务逻辑,如果没有获取锁,不需要重试,再者也没必要引入etcd,就基于redis实现了分布式锁。
网友评论