美文网首页
zk与redis实现分布式锁

zk与redis实现分布式锁

作者: tracy_668 | 来源:发表于2018-08-26 23:25 被阅读215次

    分布式锁

    在同一个jvm中,jdk已经提供了lock、synchronized等同步机制,但是在分布式环境下,分布在不同机器上的多个进程可能对一些资源产生竞争关系,无法再使用jdk提供的同步机制,分布式锁就是用来解决这种场景下的同步问题。

    利用zk实现分布式锁思路

    1. 建立一个名为lock的持久节点(Persistent)
    2. 当进程需要访问共享资源时,会先在lock节点下创建临时顺序节点,然后对lock节点下所有的子节点进行按序号排序,如果该进程创建的临时节点是所有子节点序号最小的,该进程获得锁进入临界区,执行任务后删除对应的临时顺序节点
    3. 如果序号不是最小的,就获得该节点序号的上一个序号对应节点,并给该节点是否存在注册监听事件,等待监听到其上个节点被删除后,重新去获取锁,从而进入临界区执行任务,执行后同样删除所创建的临时节点,这里只去监听比自己节点序号小1的节点,不用去监听所有的节点。

    代码实现

    详细代码请点击zk实现分布式锁

    public class DistributedLock implements Watcher {
        private int threadId;
        private ZooKeeper zk = null;
        private String selfPath;
        private String waitPath;
        private String LOG_PREFIX_OF_THREAD;
        private static final int SESSION_TIMEOUT = 10000;
        private static final String GROUP_PATH = "/locks";
        private static final String SUB_PATH = "/locks/sub";
        private static final String CONNECTION_STRING = "ubuntu:2181";
    
        private static final int THREAD_NUM = 10;
        // 确保连接zk成功;
        private CountDownLatch connectedSemaphore = new CountDownLatch(1);
        // 确保所有线程运行结束;
        private static final CountDownLatch threadSemaphore = new CountDownLatch(
                THREAD_NUM);
    
        public DistributedLock(int id) {
            this.threadId = id;
            LOG_PREFIX_OF_THREAD = "【第" + threadId + "个线程】";
        }
    
        public static void main(String[] args) {
                    // 用多线程模拟分布式环境
            for (int i = 0; i < THREAD_NUM; i++) {
                final int threadId = i + 1;
                new Thread() {
                    @Override
                    public void run() {
                        try {
                            DistributedLock dc = new DistributedLock(threadId);
                            dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
                            // GROUP_PATH不存在的话,由一个线程创建即可;
                            synchronized (threadSemaphore) {
                                dc.createPath(GROUP_PATH, "该节点由线程" + threadId
                                        + "创建", true);
                            }
                            dc.getLock();
                        } catch (Exception e) {
                            System.out.println("【第" + threadId + "个线程】 抛出的异常:");
                            e.printStackTrace();
                        }
                    }
                }.start();
            }
            try {
                threadSemaphore.await();
                System.out.println("所有线程运行结束!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    /**
         * 获取锁
         *
         * @return
         */
        private void getLock() throws KeeperException, InterruptedException {
                    // 去创建临时节点
            selfPath = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(LOG_PREFIX_OF_THREAD + "创建锁路径:" + selfPath);
            if (checkMinPath()) {
                getLockSuccess();
            }
        }
    
        /**
         * 创建节点
         *
         * @param path 节点path
         * @param data 初始数据内容
         * @return
         */
        public boolean createPath(String path, String data, boolean needWatch)
                throws KeeperException, InterruptedException {
            if (zk.exists(path, needWatch) == null) {
                System.out.println(LOG_PREFIX_OF_THREAD
                        + "节点创建成功, Path: "
                        + this.zk.create(path, data.getBytes(),
                                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
                        + ", content: " + data);
            }
            return true;
        }
    
        /**
         * 创建ZK连接
         *
         * @param connectString ZK服务器地址列表
         * @param sessionTimeout Session超时时间
         */
        public void createConnection(String connectString, int sessionTimeout)
                throws IOException, InterruptedException {
            zk = new ZooKeeper(connectString, sessionTimeout, this);
            connectedSemaphore.await();
        }
    
        /**
         * 获取锁成功
         */
        public void getLockSuccess() throws KeeperException, InterruptedException {
            if (zk.exists(this.selfPath, false) == null) {
                System.out.println(LOG_PREFIX_OF_THREAD + "本节点已不在了...");
                return;
            }
            System.out.println(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");
            Thread.sleep(2000);
            System.out.println(LOG_PREFIX_OF_THREAD + "删除本节点:" + selfPath);
            zk.delete(this.selfPath, -1);
            releaseConnection();
            threadSemaphore.countDown();
        }
    
        /**
         * 关闭ZK连接
         */
        public void releaseConnection() {
            if (this.zk != null) {
                try {
                    this.zk.close();
                } catch (InterruptedException e) {
                }
            }
            System.out.println(LOG_PREFIX_OF_THREAD + "释放连接");
        }
    
        /**
         * 检查自己是不是最小的节点
         *
         * @return
         */
        public boolean checkMinPath() throws KeeperException, InterruptedException {
            List<String> subNodes = zk.getChildren(GROUP_PATH, false);
            Collections.sort(subNodes);
            int index = subNodes.indexOf(selfPath.substring(GROUP_PATH.length() + 1));
            switch (index) {
            case -1: {
                System.out.println(LOG_PREFIX_OF_THREAD + "本节点已不在了..." + selfPath);
                return false;
            }
            case 0: {
                System.out.println(LOG_PREFIX_OF_THREAD + "子节点中,我果然是老大...哈哈哈" + selfPath);
                return true;
            }
            default: {
                this.waitPath = GROUP_PATH + "/" + subNodes.get(index - 1);
                System.out.println(LOG_PREFIX_OF_THREAD + "获取子节点中,排在我前面的。。。"
                        + waitPath);
                try {
                    zk.getData(waitPath, true, new Stat());
                    return false;
                } catch (KeeperException e) {
                    if (zk.exists(waitPath, false) == null) {
                        System.out.println(LOG_PREFIX_OF_THREAD + "子节点中,排在我前面的。。。"
                                + waitPath + "已失踪,幸福来得太突然?");
                        return checkMinPath();
                    } else {
                        throw e;
                    }
                }
            }
    
            }
    
        }
    
        @Override
        public void process(WatchedEvent event) {
                   // 监听器处理事件 
            if (event == null) {
                return;
            }
            Event.KeeperState keeperState = event.getState();
            Event.EventType eventType = event.getType();
            if (Event.KeeperState.SyncConnected == keeperState) {
                if (Event.EventType.None == eventType) {
                    System.out.println(LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器");
                    connectedSemaphore.countDown();
                } else if (event.getType() == Event.EventType.NodeDeleted
                        && event.getPath().equals(waitPath)) {
                    System.out.println(LOG_PREFIX_OF_THREAD
                            + "收到情报,排我前面的家伙已挂,我是不是可以出山了?");
                    try {
                        if (checkMinPath()) {
                            getLockSuccess();
                        }
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } else if (Event.KeeperState.Disconnected == keeperState) {
                System.out.println(LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接");
            } else if (Event.KeeperState.AuthFailed == keeperState) {
                System.out.println(LOG_PREFIX_OF_THREAD + "权限检查失败");
            } else if (Event.KeeperState.Expired == keeperState) {
                System.out.println(LOG_PREFIX_OF_THREAD + "会话失效");
            }
        }
    }
    
    

    可以看到代码还是有点复杂的,通常线上很好会使用zk来实现分布式锁,redis作为一种较更简单方便的方式常常被使用。

    基于redis的SetNX实现分布式锁原理:

    setNX是Redis提供的一个原子操作,如果指定key存在,那么setNX失败,如果不存在会进行Set操作并返回成功。我们可以利用这个来实现一个分布式的锁,主要思路就是,set成功表示获取锁,set失败表示获取失败,失败后需要重试。

    package lock;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import redis.clients.jedis.Jedis;
    
    /**
     * Redis分布式锁
     */
    public class RedisLockTest {
    
        private Jedis jedisCli = new Jedis("192.168.58.99", 6379);
    
        private int expireTime = 1;
    
        /**
         * 获取锁
         *
         * @param lockID
         * @return
         */
        public boolean lock(String lockID) {
            while (true) {
                long returnFlag = jedisCli.setnx(lockID, "1");
                if (returnFlag == 1) {
                    System.out.println(Thread.currentThread().getName() + " get lock....");
                    return true;
                }
                System.out.println(Thread.currentThread().getName() + " is trying lock....");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return false;
                }
            }
        }
    
        /**
         * 超时获取锁
         *
         * @param lockID
         * @param timeOuts
         * @return
         */
        public boolean lock(String lockID, long timeOuts) {
            long current = System.currentTimeMillis();
            long future = current + timeOuts;
            long timeStep = 500;
            CountDownLatch latch = new CountDownLatch(1);
            while (future > current) {
                long returnFlag = jedisCli.setnx(lockID, "1");
                if (returnFlag == 1) {
                    System.out.println(Thread.currentThread().getName() + " get lock....");
                    jedisCli.expire(lockID, expireTime);
                    return true;
                }
                System.out.println(Thread.currentThread().getName() + " is trying lock....");
                try {
                    latch.await(timeStep, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                current = current + timeStep;
            }
            return false;
        }
    
        public void unlock(String lockId) {
            long flag = jedisCli.del(lockId);
            if (flag > 0) {
                System.out.println(Thread.currentThread().getName() + " release lock....");
            } else {
                System.out.println(Thread.currentThread().getName() + " release lock fail....");
            }
        }
    
        /**
         * 线程工厂,命名线程
         */
        public static class MyThreadFactory implements ThreadFactory {
            public static AtomicInteger count = new AtomicInteger();
    
            @Override
            public Thread newThread(Runnable r) {
                count.getAndIncrement();
                Thread thread = new Thread(r);
                thread.setName("Thread-lock-test " + count);
                return thread;
            }
        }
    
        public static void main(String args[]) {
            final String lockID = "lockTest";
            Runnable task = () -> {
                RedisLockTest testCli = new RedisLockTest();
                testCli.lock(lockID);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                testCli.unlock(lockID);
            };
    
            MyThreadFactory factory = new MyThreadFactory();
            ExecutorService services = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 3; i++)
                services.execute(factory.newThread(task));
        }
    
    }
    

    实现起来比较简单明了,并且对于锁删除失败(分布式锁基本都有这个问题),可以对key设置失效时间,这个超时时间需要能保证获得锁的这个进程已经获取完了竞争资源。相比zk的实现唯一不足的地方是没有通知机制,需要不断的轮询和睡眠去获取锁。

    相关文章

      网友评论

          本文标题:zk与redis实现分布式锁

          本文链接:https://www.haomeiwen.com/subject/hzpwiftx.html