Redis实现分布式锁

作者: 一只小哈 | 来源:发表于2016-07-24 12:13 被阅读4916次

之前总结过一篇利用Redis的事务特性和Watch实现CAS乐观锁的Case,除了用事务和Watch实现锁还有更简单的实现——基于Redis的悲观锁主要是依靠SETNX命令来实现。

SETNX:
&nbsp SETNX是Redis提供的一种类Set的命令,不同的是这个命令只会在键不存在的情况下为键设置值,官方对SETNX的解释如下:

Paste_Image.png

当然只有这一个命令还是不够的,我们还需要结合事务进行锁的释放,当然归根结底最重要的性质还是Redis的单线程的性质。

在了解了SETNX之后,我们需要用Jedis实现一下简单的分布式锁,代码如下:

package com.zhiming.redis.lock;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Transaction;

public class RedisSampleLock {
private static final String redisHost = "10.0.3.67";
    
    private static final int port = 6381;
    
    private static JedisPoolConfig config;
    
    private static JedisPool pool;
    
    private static ExecutorService service;
    
    private static int ThLeng=10;
    
    private static CountDownLatch latch;
    
    private static AtomicInteger Countor = new AtomicInteger(0);
    
    private static int count = 0;
    
    private static String LockName = "mylock_test10";
    
    static{
        //利用Redis连接池,保证多个线程利用多个连接,充分模拟并发性
        config = new JedisPoolConfig();
        config.setMaxIdle(10);
        config.setMaxWaitMillis(1000);
        config.setMaxTotal(30);
        pool = new JedisPool(config, redisHost, port);
        //利用ExecutorService 管理线程
        service = Executors.newFixedThreadPool(ThLeng);
        //CountDownLatch保证主线程在全部线程结束之后退出
        latch = new CountDownLatch(ThLeng);
    }
    
    /**
     * 獲取鎖
     * tips:生成一个UUID,作为Key的标识,不断轮询lockName,直到set成功,表示成功获取锁。
     *       其他的线程在set此lockName时被阻塞直到超时。
     * @param pool
     * @param lockName
     * @param timeouts
     * @return 鎖標識
     */
    public static String getLock(JedisPool pool,String lockName,long timeouts){
        Jedis client = pool.getResource();
        try{
            String value = UUID.randomUUID().toString();
            long timeWait = System.currentTimeMillis() + timeouts*1000;
            while(System.currentTimeMillis()<timeWait){
                if(client.setnx(lockName, value) == 1){
                    System.out.println("lock geted");
                    return value;
                }
                try {
                    Thread.currentThread().sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("get lock timeouts");
        }finally{
            //pool.returnBrokenResource(client);
            pool.returnResource(client);
        }
        return null;
    }
    
    /**
     * 釋放鎖
     * tips:对lockName做watch,开启一个事务,删除以LockName为key的锁,删除后,此锁对于其他线程为可争抢的。
     * 
     * @param pool
     * @param lockName
     * @param value
     */
    public static void relaseLock(JedisPool pool,String lockName,String value){
        Jedis client = pool.getResource();
        try{
        while(true){
            client.watch(lockName);
            if (client.get(lockName).equals(value)){
                Transaction tx = client.multi();
                tx.del(lockName);
                tx.exec();
                return;
            }
            client.unwatch();
        }
        }finally{
            //pool.returnBrokenResource(client);
            pool.returnResource(client);
        }
    }
    
    
    
    public static void main(String args[]){
        for(int i=0;i<ThLeng;i++){
            String tName = "thread-"+i;
            Thread t = new Thread(new SubAddThread(pool,tName));
            System.out.println(tName+"inited...");
            service.submit(t);
        }
        service.shutdown();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Countor.get());
        System.out.println(count);
    }
    
    
    public static class SubAddThread implements Runnable{
        
        private String name;
        
        private JedisPool pool;
        
        public SubAddThread(JedisPool pool,String uname){
            this.pool = pool;
            this.name = uname;
        }
        
        public void run() {
            for(int i=0;i<100;i++){
                System.out.println(name+" starting...");
                String valuse = getLock(pool,LockName,50);
                System.out.println(name+" get Lock "+valuse);
                count++;
                relaseLock(pool,LockName,valuse);
                Countor.incrementAndGet();
                System.out.println(name+" "+count);
            }
            latch.countDown();
            System.out.println(name+" complated");
        }
        
    }
}

主要是利用SetNx的特性实现,上面的实现还是有很多问题,但是说明了Redis实现分布式锁的思想。经过测试,上面的代码保证了count++的原子性,最后输出结果和AtomicInteger的实例Counter输出的一致。

相关文章

网友评论

    本文标题:Redis实现分布式锁

    本文链接:https://www.haomeiwen.com/subject/okjijttx.html