之前总结过一篇利用Redis的事务特性和Watch实现CAS乐观锁的Case,除了用事务和Watch实现锁还有更简单的实现——基于Redis的悲观锁主要是依靠SETNX命令来实现。
SETNX:
  SETNX是Redis提供的一种类Set的命令,不同的是这个命令只会在键不存在的情况下为键设置值,官方对SETNX的解释如下:
当然只有这一个命令还是不够的,我们还需要结合事务进行锁的释放,当然归根结底最重要的性质还是Redis的单线程的性质。
在了解了SETNX之后,我们需要用Jedis实现一下简单的分布式锁,代码如下:
package com.zhiming.redis.lock;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Transaction;
public class RedisSampleLock {
private static final String redisHost = "10.0.3.67";
private static final int port = 6381;
private static JedisPoolConfig config;
private static JedisPool pool;
private static ExecutorService service;
private static int ThLeng=10;
private static CountDownLatch latch;
private static AtomicInteger Countor = new AtomicInteger(0);
private static int count = 0;
private static String LockName = "mylock_test10";
static{
//利用Redis连接池,保证多个线程利用多个连接,充分模拟并发性
config = new JedisPoolConfig();
config.setMaxIdle(10);
config.setMaxWaitMillis(1000);
config.setMaxTotal(30);
pool = new JedisPool(config, redisHost, port);
//利用ExecutorService 管理线程
service = Executors.newFixedThreadPool(ThLeng);
//CountDownLatch保证主线程在全部线程结束之后退出
latch = new CountDownLatch(ThLeng);
}
/**
* 獲取鎖
* tips:生成一个UUID,作为Key的标识,不断轮询lockName,直到set成功,表示成功获取锁。
* 其他的线程在set此lockName时被阻塞直到超时。
* @param pool
* @param lockName
* @param timeouts
* @return 鎖標識
*/
public static String getLock(JedisPool pool,String lockName,long timeouts){
Jedis client = pool.getResource();
try{
String value = UUID.randomUUID().toString();
long timeWait = System.currentTimeMillis() + timeouts*1000;
while(System.currentTimeMillis()<timeWait){
if(client.setnx(lockName, value) == 1){
System.out.println("lock geted");
return value;
}
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("get lock timeouts");
}finally{
//pool.returnBrokenResource(client);
pool.returnResource(client);
}
return null;
}
/**
* 釋放鎖
* tips:对lockName做watch,开启一个事务,删除以LockName为key的锁,删除后,此锁对于其他线程为可争抢的。
*
* @param pool
* @param lockName
* @param value
*/
public static void relaseLock(JedisPool pool,String lockName,String value){
Jedis client = pool.getResource();
try{
while(true){
client.watch(lockName);
if (client.get(lockName).equals(value)){
Transaction tx = client.multi();
tx.del(lockName);
tx.exec();
return;
}
client.unwatch();
}
}finally{
//pool.returnBrokenResource(client);
pool.returnResource(client);
}
}
public static void main(String args[]){
for(int i=0;i<ThLeng;i++){
String tName = "thread-"+i;
Thread t = new Thread(new SubAddThread(pool,tName));
System.out.println(tName+"inited...");
service.submit(t);
}
service.shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Countor.get());
System.out.println(count);
}
public static class SubAddThread implements Runnable{
private String name;
private JedisPool pool;
public SubAddThread(JedisPool pool,String uname){
this.pool = pool;
this.name = uname;
}
public void run() {
for(int i=0;i<100;i++){
System.out.println(name+" starting...");
String valuse = getLock(pool,LockName,50);
System.out.println(name+" get Lock "+valuse);
count++;
relaseLock(pool,LockName,valuse);
Countor.incrementAndGet();
System.out.println(name+" "+count);
}
latch.countDown();
System.out.println(name+" complated");
}
}
}
主要是利用SetNx的特性实现,上面的实现还是有很多问题,但是说明了Redis实现分布式锁的思想。经过测试,上面的代码保证了count++的原子性,最后输出结果和AtomicInteger的实例Counter输出的一致。
网友评论