1. 概述
当前主流大型互联网的业务系统的业务都是高度复杂,技术架构都是分布式,这样提高可用性和和稳定性的同时,也带来多节点在同时段执行的过程中相互干扰,这在对同一资源的处理会产生数据的不一致。
那如何能保证在同一时刻该资源只能被一个应用处理,而不能并发处理。答案就是引入一种分布式协调机制来调度,而这种分布式协调机制的核心我们称之为分布式锁。
- 成员变量 V 存在 JVM_1、JVM_2、JVM_3 三个 JVM 内存中
- 成员变量 V 同时都会在JVM分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的
- 不是同时发过来,三个请求分别操作三个不同 JVM 内存区域的数据,变量 V 之间不存在共享,也不具有可见性,处理的结果也是不对的
注:在类中,该成员变量V是一个有状态的对象。
2. 特点
- 在分布式系统环境下,同一个方法在同一时间只能被一个机器的同一个线程执行;
- 高可用的获取锁与释放锁;
- 高性能的获取锁与释放锁;
- 具备可重入特性;
- 具备锁的自我失效机制,防止死锁;
- 满足非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
3. 实现方式
根据分布式的CAP理论我们了解“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”
所以我们在系统设计之初就分析调研,根据分析调研结果对这三者做取舍。
借鉴在主流互联网的经验,通常靠牺牲一致性 来换取系统的高可用性,系统的“一致性”保障只能用“最终一致性”来保证,这个最终时间需要在可控可接受的范围内。很多场景下,我们为了保证最终一致性,都会做很多技术方案来支持,比如分布式事务、分布式锁。
在分布式锁的技术实现上,主流认可有三种实现方式,从复杂度来看,由难至易依次增加:
基于数据库实现分布式锁;
基于缓存(Redis/Memcached等)实现分布式锁;
基于Zookeeper实现分布式锁;
无论哪一种方式,都不可能完美,需要根据实际业务场景做出选择。
4. 数据库实现
4.1. 前提
在了解数据库实现分布式锁的之前,我们首了解乐观锁(Optimistic Lock)和悲观锁(Pessimistic Lock)。
4.1.1. 乐观锁
每次去取数据的时候都会认为不会有其他线程对数据进行修改,因此不会上锁,但是在更新时会判断其他线程在这之前有没有对数据进行修改,一般会使用版本号机制或CAS操作实现;乐观锁适用于只“读” 的应用场景,这样做可以提高吞吐量,像数据库使用的write_condition的机制,本身就是乐观锁。
4.1.2. 悲观锁
每次取数据时都认为其他线程会修改,所以都会加锁(读锁、写锁、行锁等),当其他线程想要访问数据时,都需要阻塞(block)挂起。可以依靠数据库实现,如行锁、读锁和写锁等,都是在操作之前加锁,它对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。在 Java中,synchronized的思想也是悲观锁。
乐观锁、悲观锁两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。本质上,数据库的乐观锁做法和悲观锁做法主要就是避免丢失更新问题:
4.2. 乐观锁实现
从上面的前提概要我们知道,乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。
当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行修改写回数据库时,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。
image假定同一账号下,小明和小明媳妇同时取银行进行取款操作,账户余额200¥,小明取款100¥,小明媳妇取款150¥。
-
没有锁机制的场景下,可能会出现账户同时被扣款150¥和100¥,导致账户余额出现负数情况,这样的话银行可能混不下去。
-
如果我们用到锁机制,小明和小明媳妇都在取款时候,除看到余额的操作,还在后台事务中读取版本号version=1,不论小明和小明媳妇先成功执行取款操作,都会将版本号version+1,即version=2,没有成功执行的那一方再去操作时候读取出来的version是2,那么就会触发重新获取余额。
乐观锁关键点:
-
锁服务要有递增的版本号version
-
每次更新数据都要先判断版本号对不对,然后再写入新的版本号
4.3. 悲观锁实现
4.3.1. 方案1_for update
Oracle、Mysql中是基于for update来实现加锁的。在MYSQL中需要注意的是,在InnoDB中只有字段加了索引的,才会是行级锁,否者是表级锁,所以一定要对where的条件字段加索引。
当这条记录加上排它锁之后,其它线程是无法操作这条记录的。
4.3.2. 方案2_唯一性约束
对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功。
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', 'methodName');
4.3.3. 方案3_查询并占有
先获取锁的信息:
select id, method_name, state,version from method_lock where state=1 and method_name='methodName';
再占有
update t_resoure set state=2, version=2, update_time=now() where method_name='methodName' and state=1 and version=2;
如果没有更新影响到一行数据,则说明这个资源已经被别人占位。
4.3.4. 附 表结构
DROP TABLE IF EXISTS method_lock;
CREATE TABLE method_lock (
lck_id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
method_name VARCHAR(64) NOT NULL COMMENT '锁定的方法名',
state TINYINT NOT NULL COMMENT '1:未分配;2:已分配',
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
ver INT NOT NULL COMMENT '版本号',
PRIMARY KEY (lck_id),
UNIQUE KEY uidx_method_name (method_name) USING BTREE
) ENGINE=INNODB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
4.4. 总结
4.4.1. 缺点
这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
配置实现细节
-
数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
-
没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
-
非阻塞的?搞一个while循环,直到insert成功再返回成功。
-
非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
5. Redis实现
基于Redis实现的锁机制,主要是依赖redis自身的原子操作,因为redis是单线程。要求redis版本大于2.6.12。
5.1. redis单实例
- 引入pom
版本号大家以实际使用中的为准,我这里仅供参考,因为spring-boot的自动注册功能会为我们提供StringRedisTemplate,直接使用即可。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
- yml文件
server:
port: 9090
spring:
datasource:
name: mysql
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/springboot?characterEncoding=utf-8&useSSL=true
username: root
password: 123456
druid:
initial-size: 5
min-idle: 5
max-active: 20
max-wait: 30000
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
validation-query: select 1
test-while-idle: true
test-on-borrow: false
test-on-return: false
pool-prepared-statements: false
max-pool-prepared-statement-per-connection-size: 20
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=6000
redis:
database: 0
host: 127.0.0.1
port: 6379
mybatis:
mapperLocations: classpath:mapper/**/*.xml
- 核心实现
package xyz.wongs.weathertop.comp;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RedisLockComponent {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* @Description 获取锁,默认:失效时间为5,失效时间的单位为秒,重试次数为3,休眠4秒
* @param key
* @param value
* @param expire redis过期时间
* @return boolean
* @throws
* @date 2019/11/16 21:37
*/
public boolean getLock(String key,String value){
return getLock(key,value,5, TimeUnit.SECONDS,3,5000);
}
/**
* @Description 获取锁,默认:失效时间的单位为秒,重试次数为3,休眠4秒
* @param key
* @param value
* @param expire redis过期时间
* @return boolean
* @throws
* @date 2019/11/16 21:37
*/
public boolean getLock(String key,String value,long expire){
return getLock(key,value,expire, TimeUnit.SECONDS,3,5000);
}
/**
* @Description 获取锁,默认重试次数为3,休眠5秒
* @param key
* @param value
* @param expire redis过期时间
* @param unit
* @return boolean
* @throws
* @date 2019/11/16 21:37
*/
public boolean getLock(String key,String value,long expire, TimeUnit unit){
return getLock(key,value,expire, unit,3,5000);
}
/**
* @Description
* @param key
* @param value
* @param expire redis过期时间
* @param unit
* @param tryCount 重试次数
* @param waitMillis 每次重试要等待时间
* @return boolean
* @throws
* @date 2019/11/16 21:37
*/
public boolean getLock(String key,String value,long expire, TimeUnit unit,int tryCount,int waitMillis){
//setIfAbsent如果键不存在则新增,存在则不改变已经有的值。
boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key,value,expire,unit);
if(success) {
return true;
}
//判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
String val = stringRedisTemplate.opsForValue().get(key);
if(!Strings.isNullOrEmpty(val)){
if(System.currentTimeMillis() - Long.parseLong(val) > unit.toMillis(expire)){
// 超时移除
stringRedisTemplate.delete(key);
}
}
// 重试、等待
if (tryCount > 0 && waitMillis > 0) {
try {
Thread.sleep(waitMillis);
} catch (InterruptedException e) {
log.error("getLock exception{}",e.getMessage());
}
return getLock(key,value,expire,unit,tryCount - 1,waitMillis);
}
return false;
}
/**
* @Description 获取等待时间
* @param key
* @param expire
* @param unit
* @return long 秒
* @throws
* @date 2019/11/16 21:44
*/
public long getWaitSecond(String key,long expire,TimeUnit unit) {
long currentTime = System.currentTimeMillis();
long preTime = Long.parseLong(stringRedisTemplate.opsForValue().get(key));
return (preTime + unit.toMillis(expire) - currentTime) / 1000;
}
/**
* @Description 设置锁的过期时间,默认单位为毫秒
* @param key
* @param expTime
* @return Boolean
* @throws
* @date 2019/11/16 21:18
*/
public Boolean renewal(String key,int expTime){
return renewal(key, expTime, TimeUnit.MILLISECONDS);
}
/**
* @Description 设置锁的过期时间
* @param key
* @param expTime
* @param unit
* @return Boolean
* @throws
* @date 2019/11/16 21:18
*/
public Boolean renewal(String key,int expTime,TimeUnit unit){
return stringRedisTemplate.expire(key, expTime, unit);
}
/**
* @Description 解锁
* @param key
* @param val
* @return void
* @throws
* @date 2019/11/16 21:13
*/
public void unlock(String key,String val){
try {
String value = stringRedisTemplate.opsForValue().get(key);
if(!Strings.isNullOrEmpty(value) && val.equals(value) ){
// 删除锁状态
stringRedisTemplate.opsForValue().getOperations().delete(key);
}
} catch (Exception e) {
log.error("unlock exception{}",e);
}
}
}
- 模拟样例
package xyz.wongs.weathertop.web;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import xyz.wongs.weathertop.base.message.enums.ResponseCode;
import xyz.wongs.weathertop.base.message.response.ResponseResult;
import xyz.wongs.weathertop.comp.RedisLockComponent;
import xyz.wongs.weathertop.deno.entity.RedisLock;
import xyz.wongs.weathertop.deno.mapper.RedisLockMapper;
import java.util.concurrent.TimeUnit;
@RestController
@Slf4j
public class RedisController {
@Autowired
private RedisLockMapper redisLockMapper;
@Autowired
private RedisLockComponent redisLockComponent;
/**
* 超时时间 5s
*/
private static final int TIMEOUT = 3;
@RequestMapping(value = "/seckilling/{key}")
public ResponseResult Seckilling(@PathVariable("key") String key){
ResponseResult responseResult = new ResponseResult();
//1、加锁
String value = System.currentTimeMillis() + "";
if(!redisLockComponent.getLock(key,value,6)){
responseResult.setStatus(false);
responseResult.setCode(ResponseCode.DICT_LOCK_FAIL.getCode());
responseResult.setMsg("排队人数太多,请稍后再试.");
return responseResult;
}
RedisLock redisLock = redisLockMapper.selectByPrimaryKey(1);
// 2、查询库存,为0则活动结束
if(redisLock.getCounts()==0){
responseResult.setStatus(false);
responseResult.setCode(ResponseCode.RESOURCE_NOT_EXIST.getCode());
responseResult.setMsg("库存不够.");
return responseResult;
}
//3、减库存
redisLock.setCounts(redisLock.getCounts()-1);
redisLockMapper.updateByPrimaryKeySelective(redisLock);
try{
Thread.sleep(5000);//模拟减库存的处理时间
}catch (InterruptedException e){
e.printStackTrace();
}
//4、释放锁
redisLockComponent.unlock(key,value);
responseResult.setMsg("恭喜您,秒杀成功。");
return responseResult;
}
}
image.png
5.2. redis集群实现
实际上我们为了高可用,降低单机故障概率,肯定将redis集群模式部署,这样情况下我们整合redisson,由于篇幅有限,这里暂时挖个坑,就不探讨这个,下一篇再说
6. Zookeeper实现
在上一节,给大家演示基于Redis的实现,这一章节我们来看另一种分布式锁的实现——Zookeeper。
关于Zookeeper的为什么能实现分布式锁,这里只说明zookeeper核心保存结构是DataTree数据结构,内部实现基于Map<String, DataNode>的数据结构,其他详细内容大家自行取官网查阅,这里也不赘述。
zookeeper原理Zookeeper中提供了节点类型主要有:
-
持久节点:节点创建后,将一直存在,直到有删除操作来主动清除。
-
顺序节点:假如当前一个父节点为/lock,可以在该父节点下面创建子节点;Zookeeper本身提供了可选的有序特性,例如我们可以创建子节点“/lock/test_”并且指明有序,那么Zookeeper在生成子节点时会根据当前子节点数量自动添加整数序号,如果第一个子节点为/lock/test_0000000000,下一个节点则为/lock/test_0000000001,依次类推。
-
临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
Zookeeper从诞生至今,市面上也有基于zk作为分布式锁实现的成熟框架,这里我们就以Curator就是代表 ,它就是Netflix开源的一套ZooKeeper客户端框架,它提供zk场景的绝大部分实现,使用Curator就不必关心其内部算法,Curator为使用者提供操作锁的API,比如获取锁、释放锁等,在实际开发中都非常方便,最后别忘记代码的finally代码块中,要最终确保锁能正确释放即可。
Curator提供了四种分布式锁:
-
InterProcessMutex:分布式可重入排它锁
-
InterProcessSemaphoreMutex:分布式排它锁
-
InterProcessReadWriteLock:分布式读写锁
-
InterProcessMultiLock:将多个锁作为单个实体管理的容器
说了这么多,不上代码,许多老铁恐怕都要犯困,没激情,下来我们用Coding带大家了解Zookeeper如何实现分布式锁,我们还是以Springboot基本框架,写一个案例。
6.1. 引入POM依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
6.2. application文件
curator:
retryCount: 5
elapsedTimeMs: 5000
connectString: 192.168.147.132:2181
sessionTimeoutMs: 60000
connectionTimeoutMs: 5000
6.3. 核心实现
/**
* @Description 获取分布式锁
* @param path 提供可供写入的路径
* @return void
* @throws
* @date 2019/11/4 9:36
*/
public boolean acquireDistributedLock(String path) {
boolean lock = true;
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
while (true) {
try {
curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
log.info("success to acquire lock for path:{}", keyPath);
break;
} catch (Exception e) {
log.info("failed to acquire lock for path:{}", keyPath);
log.info("while try again .......");
try {
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
countDownLatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
lock = false;
}
}
return lock;
}
/**
* @Description
* @param path 释放分布式锁
* @return boolean
* @throws
* @date 2019/11/4 9:36
*/
public boolean releaseDistributedLock(String path) {
boolean release = true;
String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;;
try {
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
log.error("failed to release lock");
release = false;
}
return release;
}
6.4. 演示结果
我用webcontroller实现一个restfull接口,同时打开两个URL获取同一把锁,获取的为成功,否则失败。
@GetMapping("/lock10")
public ResponseResult getLock1() {
ResponseResult responseResult = new ResponseResult();
Boolean acquire = distributedLockByZookeeper.acquireDistributedLock(PATH);
try {
if(acquire) {
log.error("I am lock1,i am updating resource……!!!");
Thread.sleep(2000);
} else{
responseResult.setCode(ResponseCode.SYSNC_LOCK.getCode());
responseResult.setMsg(ResponseCode.SYSNC_LOCK.getMsg());
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
distributedLockByZookeeper.releaseDistributedLock(PATH);
}
return responseResult;
}
获取锁成功
未抢到锁
7. 三种锁的比较
这三种方式都不是尽善尽美,如同CAP,在复杂性、可靠性、性能等方面皆无法同时满足,所以,根据实际业务情况选择最适合的方案,优雅的解决问题,少带来弊端即可。
-
实现难易程度自低至高:数据库 、 缓存 、 Zookeeper
-
实现的复杂性自低至高:Zookeeper 、缓存 、数据库
-
从性能角度自高到低:缓存 、Zookeeper 、 数据库
-
从可靠性角度自高到低:Zookeeper 、缓存 、数据库
在技术层面Redis是nosql数据,而Zookeeper是分布式协调工具,都用于分布式解决方案;防死锁的实现上Redis是通过对key设置有效期来解决死锁,而Zookeeper使用会话有效期方式解决死锁现象;数据库的实现上,高并发过程中对库的压力会增大
8. 源码
Github演示源码 ,记得给Star。
网友评论