导言
后端程序员多多少少都与各种各样的“锁”打过交道,如: 电商网站在下单时对某个商品减库存,必须加锁防止多个客户同时下单导致库存量错误; 抢分享红包时必须对红包加锁防止用户抢到的金额超过红包上限等。
对于单体或部署在单台服务器上的应用而言,对某种资源加锁相对简单,使用语言本身的并发控制机制或是文件锁即可实现。然而为了提高响应速度以及防止单点,当前的应用常常会在多台服务器上部署多个实例,这时就需要进行分布式的并发控制了。
一种实现分布式锁的思路利用了redis的setnx
命令,除了这种思路以外,Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。由于redis是单线程的,单次执行的lua脚本可以认为是原子的,redis的java客户端redisson就利用了这一特性实现了分布式锁。
redisson的加锁原理
redisson加锁的逻辑在这里,如果释放锁的时间不是-1,则直接试着加锁,否则认为该锁的超时时间为无穷大,在成功获得锁之后每隔一段时间刷新锁的过期时间。
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime,
TimeUnit unit,
final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime,
unit,
threadId,
RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture
= tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
TimeUnit.SECONDS,
threadId,
RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Boolean ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
加锁的过程主要是靠下面这段lua脚本:
<T> RFuture<T> tryLockInnerAsync(long leaseTime,
TimeUnit unit,
long threadId,
RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime,
getLockName(threadId));
}
上面的KEYS[1]为加锁时指定的锁的名称,argv[1]是internalLockLeaseTime,argv[2]是thread_id。这段逻辑为:如果锁不存在,则hset 锁名称 线程id 1
,直接返回。 如果hexists 锁名称 线程id
返回1,则说明该客户端已经加过锁了,则将自己线程id对应的值增加1,并重新设置该key的超时时间,实现可重入。否则说明该锁已经被其他客户端获取,返回锁还有多久过期。如果加锁失败,客户端可以选择利用redis的pub/sub机制监听一个channel,在监听到锁被释放后尝试重新加锁,或者返会失败让客户端决定下一步操作。
解锁时KEY[1]为锁名称,KEY[2]为关注的队列名称,ARGV[1]为解锁消息,ARGV[2]为超时时间,ARGV[3]为线程:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(),
getChannelName()),
LockPubSub.unlockMessage,
internalLockLeaseTime, getLockName(threadId));
}
解锁的时候如果锁已经超时,则通知其他等待获取该锁的客户端并返回1。如果该锁没有被当前客户端获得,则什么都不做直接返回null。如果当期客户端拥有锁,则为了可重入,先减少一次当前线程获取锁的次数,如果unlock次数小于lock次数,则重新设置锁的超时时间,返回0,如果锁已经被彻底释放,则删除掉锁,并通知其他等待获取锁的客户端,并返回1表示解锁成功。
总体而言,redisson的锁防止了由于客户端死掉而导致的锁永久被占用,还支持可重入锁,功能上还是比较完备的。
封装redisson
redisson的分布式锁从功能上说已经够用了,为了方便使用以及与spring集成,我们需要再将其做一次封装,由于redis有集群、单机、sentinel几种模式,我们将加锁功能抽象起来,提供统一的接口。话不多说,我们之间上代码:
/**
* Created by millions on 2017/1/6.
* 分布式锁
*/
public interface DistributedLock {
/**
* 使用分布式锁,获取锁后执行callback指定的业务逻辑,使用锁默认超时时间。
*
* @param callback 完成回调
* @return
*/
<T> T lock(DistributedLockCallback<T> callback);
/**
* 使用分布式锁,获取锁后执行callback指定的业务逻辑,使用锁默认超时时间。
*
* @param callback
* @param leaseTime 锁超时时间。超时后自动释放锁。
* @param timeUnit 超时时间单位
* @return
*/
<T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit);
}
我们将加锁后需要的逻辑封装到callback中:
/** * Created by millions on 2017/1/6. * 分布式锁回调接口 */
public interface DistributedLockCallback<T> {
T process();
String getLockName();
}
实现单机redis的分布式锁:
public class SingleDistributedLock implements DistributedLock {
private static final long DEFAULT_TIMEOUT = 5;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
private RedissonClient redissonClient;
public SingleDistributedLock(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
public SingleDistributedLock() {
}
public <T> T lock(DistributedLockCallback<T> callback) {
return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT);
}
public <T> T lock(DistributedLockCallback<T> callback, long leaseTime,
TimeUnit timeUnit) {
RLock lock = null;
try {
lock = redissonClient.getLock(callback.getLockName());
lock.lock(leaseTime, timeUnit);
return callback.process();
} finally {
if (null != lock) {
lock.unlock();
}
}
}
public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
}
为了方便的和spring整合,我们实现spring的FactoryBean接口
public class DistributedLockFactoryBean implements FactoryBean<DistributedLock> {
private static final Logger logger
= LoggerFactory.getLogger(DistributedLockFactoryBean.class);
private RedissonClient client;
private LockModel lockModel;
private DistributedLock lock;
public DistributedLockFactoryBean(String modelName) {
lockModel = LockModel.fromString(modelName);
}
@PostConstruct
public void init() {
logger.debug("DistributedLockFactoryBean is initializing!");
//从classpath下读取RedissionClient配置
InputStream inputStream = null;
Config config = null;
try {
inputStream = DistributedLockFactoryBean
.class
.getClassLoader()
.getResourceAsStream("redis-config.json");
config = Config.fromJSON(inputStream);
} catch (IOException e) {
logger.error("读取redission配置失败");
} finally {
if (null != inputStream) {
try {
inputStream.close();
} catch (IOException e) {
logger.info("", e);
}
}
}
Assert.notNull(config);
//初始化client
client = Redisson.create(config);
}
@PreDestroy
public void destroy() {
logger.debug("DistributedLockFactoryBean will be destroyed!");
client.shutdown();
}
public DistributedLock getObject() throws Exception {
Class<? extends DistributedLock> clz = lockModel.lockClass();
lock = clz.newInstance();
Method setRedissonClient
= clz.getMethod("setRedissonClient", RedissonClient.class);
if (null != setRedissonClient) {
setRedissonClient.invoke(lock, client);
return lock;
}
throw new InvalidObjectException(clz.getSimpleName()+"不支持的RedissonClient");
}
public Class<?> getObjectType() {
return DistributedLock.class;
}
public boolean isSingleton() {
return true;
}
private enum LockModel {
SINGLE {
Class<? extends DistributedLock> lockClass() {
return SingleDistributedLock.class;
}
};
abstract Class<? extends DistributedLock> lockClass();
public static LockModel fromString(String modelName) {
LockModel[] values = LockModel.values();
for (LockModel model : values) {
if (model.name().equalsIgnoreCase(modelName)) {
return model;
}
}
throw new InvalidParameterException("找不到合适的LockModel");
}
}
}
使用时引入jar包,在resource目录下放入redis配置文件,配置好factoryBean 就可以愉快的使用啦~
{
"singleServerConfig": {
"idleConnectionTimeout": 10000,
"pingTimeout": 1000,
"connectTimeout": 10000,
"timeout": 3000,
"retryAttempts": 3,
"retryInterval": 1500,
"reconnectionTimeout": 3000,
"failedAttempts": 3,
"password": null,
"subscriptionsPerConnection": 5,
"clientName": null,
"address": "redis://192.168.0.133:6379",
"subscriptionConnectionMinimumIdleSize": 1,
"subscriptionConnectionPoolSize": 50,
"connectionMinimumIdleSize": 10,
"connectionPoolSize": 64,
"database": 0,
"dnsMonitoring": false,
"dnsMonitoringInterval": 5000
},
"threads": 0,
"nettyThreads": 0,
"codec": null,
"useLinuxNativeEpoll": false //redisson使用netty 需要指定是否使用epoll模式
}
总结
总结一下,redisson利用redis工作线程为单线程,lua脚本能够原子执行的这一特性,利用lua脚本和redis的pub/sub特性实现了分布式、可重入的锁。在redisson的基础上进行了封装,并添加了与spring的整合。这个分布式锁没用什么高大上的原理,代码也写的很屌丝,但是足以支撑一般规模的业务。从这里我想到,很多时候业务上提的一些需求,我们其实都应该多思考一下,将其抽象,整合成一般的工具库,加快自己以及同事的开发效率。从另外一个角度来说,需求方可能也要给码农们稍微充裕一点的时间,多想,多思考,这样才能更好的推进公司的发展。
网友评论