本文不对的概念进行介绍,主要介绍分布式锁的实现。
分布式锁主要有三种实现方式:
- 基于数据库的分布式锁
- 基于Redis的分布式锁
- 基于Zookeeper的分布式锁
基于数据库的分布式锁使用并不是很广泛,本文主要介绍后面两种分布式锁的实现方式。
基于Redis的分布式锁
加锁操作主要是基于Redis setnx命令式原子操作,解锁操作则是基于Redis执行lua脚本也具有原子性,实现分布式锁的关键点就是要保证加锁和解锁操作都具有原子性,代码实现如下:
@Component
public class RedisDistributeLock {
@Autowired
JedisPool jedisPool;
private long internalLockLeaseTime = 30000;
private SetParams params = SetParams.setParams().nx().px(internalLockLeaseTime);
private static final String LOCK_SUCCESS = "OK";
private static final String LOCK_SUCCESS_FLAG = "1";
private long timeout = 999999;
private String redisKey = "redis_lock";
public boolean lock(String id) {
Long start = System.currentTimeMillis();
try (Jedis jedis = jedisPool.getResource()) {
for (; ; ) {
//SET命令返回OK ,则证明获取锁成功
String lock = jedis.set(redisKey, id, params);
if (LOCK_SUCCESS.equals(lock)) {
return true;
}
//否则循环等待,在timeout时间内仍未获取到锁,则获取失败
long l = System.currentTimeMillis() - start;
if (l >= timeout) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public boolean unlock(String id) {
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
try (Jedis jedis = jedisPool.getResource()) {
Object result = jedis.eval(script, Collections.singletonList(redisKey),
Collections.singletonList(id));
return LOCK_SUCCESS_FLAG.equals(result.toString());
}
}
}
在主线程中创建两个线程并启动,两个线程抢占同一个分布式锁
@Component
public class RedisLockService {
@Autowired
RedisDistributeLock redisDistributeLock;
private String lockKey = "lock_key";
private String requestId = "my_request_id_1";
@PostConstruct
public void testLock() throws InterruptedException {
Thread t1 = new Thread(() -> {
redisDistributeLock.lock( requestId);
System.out.println("线程1获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisDistributeLock.unlock( requestId);
});
Thread t2 = new Thread(() -> {
redisDistributeLock.lock( requestId);
System.out.println("线程2获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisDistributeLock.unlock( requestId);
});
t1.start();
t2.start();
t1.join();
t2.join();
}
}
控制台输出结果如下,结果显示在线程1获取锁成功5秒之后,线程1释放了分布式锁,线程2获取到了分布式锁。时间相差5s左右,与sleep的时间吻合。
线程1获取分布式锁成功,获取时间为:2020-06-01T11:02:24.394Z
线程2获取分布式锁成功,获取时间为:2020-06-01T11:02:29.494Z
Redis官方推荐使用Redisson实现分布式锁,基于Jedis原生API实现的分布式锁无法支持可重入功能,Redisson在原生API的基础上进行了封装,为分布式系统提供了很多实用的功能,分布式锁就是其中一个,Redisson分布式锁的使用方式如下:
@Component
public class RedissonDistributeLock {
private RedissonClient client = null;
@PostConstruct
public void initRLock() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
//redis没有设置密码,加上下面这一行链接不上redis
//config.useSingleServer().setPassword(standaloneRedisConfig.getPassword());
client = Redisson.create(config);
}
public void lock(String lockKey) {
RLock lock = client.getLock(lockKey);
lock.lock();
}
public void unlock(String lockKey) {
RLock lock = client.getLock(lockKey);
lock.unlock();
}
}
在主线程中创建两个线程并启动,两个线程抢占同一个分布式锁
@Component
public class RedissonLockService {
@Autowired
RedissonDistributeLock redissonDistributeLock;
private static final String lockKey = "lock_key";
@PostConstruct
public void testRedissonDistributeLock() throws InterruptedException {
Thread t1 = new Thread(() -> {
redissonDistributeLock.lock(lockKey);
System.out.println("线程1获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redissonDistributeLock.unlock(lockKey);
});
Thread t2 = new Thread(() -> {
redissonDistributeLock.lock(lockKey);
System.out.println("线程2获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redissonDistributeLock.unlock(lockKey);
});
t1.start();
t2.start();
t1.join();
t2.join();
}
}
线程2获取分布式锁成功,获取时间为:2020-06-01T11:57:31.951Z
线程1获取分布式锁成功,获取时间为:2020-06-01T11:57:36.971Z
JetCache是阿里巴巴提供的开源的缓存框架,也提供了分布式锁的功能,使用如下
@Component
@DependsOn(value = "globalCacheConfig")
public class JetCacheLockService {
private static final String LOCK_STR = "jetcache_lock_1";
/**
* 分布式锁
*/
@CreateCache(name = "lock:", cacheType = CacheType.REMOTE)
private Cache<String, String> lockCache;
@PostConstruct
public void testJetCacheLock() throws InterruptedException {
//lockCache是懒加载的
lockCache.put("hhhh", "jjjjjjjjjj");
Thread t1 = new Thread(() -> {
boolean getKey = lockCache.tryLockAndRun(LOCK_STR, 30, TimeUnit.SECONDS, () -> {
System.out.println("线程1获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
if (getKey) {
System.out.println("线程1获取到了分布式锁");
} else {
System.out.println("线程1获取分布式锁失败");
}
});
Thread t2 = new Thread(() -> {
boolean getKey = lockCache.tryLockAndRun(LOCK_STR, 30, TimeUnit.SECONDS, () -> {
System.out.println("线程2获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
if (getKey) {
System.out.println("线程2获取到了分布式锁");
} else {
System.out.println("线程2获取分布式锁失败");
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
}
控制台输出结果如下,和上面两种分布式锁不一样,jetcache的分布式锁获取锁失败后不会等待,而是直接执行获取锁失败的逻辑,使用过程中可以根据具体的业务需要选择合适的分布式锁。
线程2获取分布式锁失败
线程1获取分布式锁成功,获取时间为:2020-06-02T08:42:01.783Z
线程1获取到了分布式锁
基于Zookeeper的分布式锁
关于zookeeper分布式锁,可以直接使用Curator框架提供的Mutex,支持可重入,公平锁等功能,使用如下:
public class CuratorDistributeLock {
public static void main(String[] args) throws InterruptedException {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//client和client2模拟不同服务实例
CuratorFramework client = CuratorFrameworkFactory.newClient("10.89.232.9:2181",retryPolicy);
client.start();
CuratorFramework client2 = CuratorFrameworkFactory.newClient("10.89.232.9:2181",retryPolicy);
client2.start();
//创建分布式锁, 锁空间的根节点路径为/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client,"/curator/distributeLock");
final InterProcessMutex mutex2 = new InterProcessMutex(client2,"/curator/distributeLock");
Thread t1 = new Thread(() -> {
try {
mutex.acquire();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("线程1获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
mutex2.acquire();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("线程2获取分布式锁成功,获取时间为:" + Instant.now().toString());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
mutex2.release();
} catch (Exception e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
//关闭客户端
client.close();
client2.close();
}
}
总结
本文主要介绍了分布式锁的四种使用方法,并展示了每一种用法及其作用。
网友评论