美文网首页
分布式环境下的独占锁,共享锁

分布式环境下的独占锁,共享锁

作者: sadamu0912 | 来源:发表于2019-03-05 20:50 被阅读0次

    基于zookeeper分布式环境下的锁,原理大概是:zookeeper的临时顺序节点。当session结束的时候,临时节点自动删除。发送watchEvent通知给客户端。然后占据临时顺序节点,索引为0的地方,这个进程就占有锁。其他的等待。当锁释放的时候,节点删除。后面一个变成锁的拥有者。主要是利用了zookeeper的临时顺序节点的原子递增性。基于zookeeper的分布式唯一id生成器,也是这个原理。

    基于zookeeper的分布式独占锁

    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNoNodeException;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 基于zookeeper的分布式独占锁
     */
    public class ZKDistributeLock {
    
        private final ZkClient client;
        private final String path;
        private final String basePath;
        private final String lockName;
        private String ourLockPath;
    
        /** 重试获取锁次数 */
        private static final Integer MAX_RETRY_COUNT = 10;
        private static final String LOCK_NAME = "lock-";
    
        public ZKDistributeLock(ZkClient client, String basePath) {
            this.client = client;
            this.basePath = basePath;
            this.path = basePath.concat("/").concat(LOCK_NAME);
            this.lockName = LOCK_NAME;
        }
    
        public void getLock() throws Exception {
            // -1 表示永不超时
            ourLockPath = tryGetLock(-1, null);
            if(ourLockPath == null){
                throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
            }
        }
    
        public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
            ourLockPath = tryGetLock(timeOut, timeUnit);
            return ourLockPath != null;
        }
    
        public void releaseLock() throws Exception {
            releaseLock(ourLockPath);
        }
    
        /**
         * 等待获取锁
         * @param startMillis
         * @param millisToWait
         * @param ourPath
         * @return
         * @throws Exception
         */
        private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {
    
            // 是否得到锁
            boolean haveTheLock = false;
            // 是否需要删除当前锁的节点
            boolean doDeleteOurPath = false;
    
            try {
    
                while (!haveTheLock) {
    
                    // 获取所有锁节点(/locker下的子节点)并排序(从小到大)
                    List<String> children = getSortedChildren();
    
                    // 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
                    String sequenceNodeName = ourPath.substring(basePath.length() + 1);
    
                    // 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
                    int ourIndex = children.indexOf(sequenceNodeName);
                    if (ourIndex < 0) {
                        // 可能网络闪断 抛给上层处理
                        throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                    }
    
                    boolean isGetTheLock = (ourIndex == 0);
    
                    if (isGetTheLock) {
                        // 如果第一位 已经获得锁
                        haveTheLock = true;
                    } else {
                        // 如果不是第一位,监听比自己小的那个节点的删除事件
                        String pathToWatch = children.get(ourIndex - 1);
                        String previousSequencePath = basePath.concat("/").concat(pathToWatch);
                        final CountDownLatch latch = new CountDownLatch(1);
                        final IZkDataListener previousListener = new IZkDataListener() {
    
                            public void handleDataDeleted(String dataPath) throws Exception {
                                latch.countDown();
                            }
    
                            public void handleDataChange(String dataPath, Object data) throws Exception {
                            }
                        };
    
                        try {
                            client.subscribeDataChanges(previousSequencePath, previousListener);
    
                            if (millisToWait != null) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait <= 0) {
                                    doDeleteOurPath = true;
                                    break;
                                }
    
                                latch.await(millisToWait, TimeUnit.MICROSECONDS);
                            } else {
                                latch.await();
                            }
                        } catch (ZkNoNodeException e) {
                            e.printStackTrace();
                        } finally {
                            client.unsubscribeDataChanges(previousSequencePath, previousListener);
                        }
    
                    }
                }
            } catch (Exception e) {
                //发生异常需要删除节点
                doDeleteOurPath = true;
                throw e;
            } finally {
                //如果需要删除节点
                if (doDeleteOurPath) {
                    deleteOurPath(ourPath);
                }
            }
    
            return haveTheLock;
        }
    
        /**
         * 获取所有锁节点(/locker下的子节点)并排序
         *
         * @return
         * @throws Exception
         */
        private List<String> getSortedChildren() throws Exception {
            try {
    
                List<String> children = client.getChildren(basePath);
                Collections.sort
                        (
                                children,
                                new Comparator<String>() {
                                    public int compare(String lhs, String rhs) {
                                        return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
                                    }
                                }
                        );
                return children;
    
            } catch (ZkNoNodeException e) {
                client.createPersistent(basePath, true);
                return getSortedChildren();
    
            }
        }
    
        protected void releaseLock(String lockPath) throws Exception {
            deleteOurPath(lockPath);
        }
    
        /**
         * 尝试获取锁
         * @param timeOut
         * @param timeUnit
         * @return 锁节点的路径没有获取到锁返回null
         * @throws Exception
         */
        protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {
    
            long startMillis = System.currentTimeMillis();
            Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;
    
            String ourPath = null;
            boolean hasTheLock = false;
            boolean isDone = false;
            int retryCount = 0;
    
            //网络闪断需要重试一试
            while (!isDone) {
                isDone = true;
    
                try {
                    // 在/locker下创建临时的顺序节点
                    ourPath = createLockNode(client, path);
    
                    // 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
                    hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
                } catch (ZkNoNodeException e) {
                    if (retryCount++ < MAX_RETRY_COUNT) {
                        isDone = false;
                    } else {
                        throw e;
                    }
                }
            }
    
            if (hasTheLock) {
                return ourPath;
            }
    
            return null;
        }
    
        private void deleteOurPath(String ourPath) throws Exception {
            client.delete(ourPath);
        }
    
        private String createLockNode(ZkClient client, String path) throws Exception {
            // 创建临时循序节点
            return client.createEphemeralSequential(path, null);
        }
    
        private String getLockNodeNumber(String str, String lockName) {
            int index = str.lastIndexOf(lockName);
            if (index >= 0) {
                index += lockName.length();
                return index <= str.length() ? str.substring(index) : "";
            }
            return str;
        }
    }
    

    pom依赖:

    <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.6</version>
            </dependency>
    
            <!-- ZkClient -->
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.9</version>
            </dependency>
    

    测试:

    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
    
    public class LockTest {
        public static final String ZK_CONNECT_STRING="47.99.45.243:2181";
    
        public static void main(String[] args) {
    
            // 需要手动创建节点 /locker
    
            ZkClient zkClient1 = new ZkClient(ZK_CONNECT_STRING, 5000,
                    5000, new BytesPushThroughSerializer());
            ZKDistributeLock lock1 = new ZKDistributeLock(zkClient1, "/locker");
    
            ZkClient zkClient2 = new ZkClient(ZK_CONNECT_STRING, 5000,
                    5000, new BytesPushThroughSerializer());
            final ZKDistributeLock lock2 = new ZKDistributeLock(zkClient2, "/locker");
    
            try {
                lock1.getLock();
                System.out.println("Client1 is get lock!");
                Thread client2Thd = new Thread(new Runnable() {
    
                    public void run() {
                        try {
                            lock2.getLock();
    //                        lock2.getLock(500, TimeUnit.SECONDS);
                            System.out.println("Client2 is get lock");
                            lock2.releaseLock();
                            System.out.println("Client2 is released lock");
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                client2Thd.start();
    
                // 5s 后lock1释放锁
                Thread.sleep(5000);
                lock1.releaseLock();
                System.out.println("Client1 is released lock");
    
                client2Thd.join();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    

    输出结果:

    Client1 is get lock!
    Client1 is released lock
    Client2 is get lock
    Client2 is released lock
    

    基于zookeeper的分布式共享锁:

    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNoNodeException;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 基于zookeeper的分布式共享锁 信号量(有缺陷,因为实际上应该是前面5个,任意一个节点被删除,都应该受到watchEvent通知,
     * 但是这样的话,代码难以控制,
     * ******也可以超时后(这个节点一直任务比较长,一直不释放锁,一直不释放锁的话,前面的已经释放,最终他会成为
     * index=0的节点),往后面寻找),但是这里暂时不实现,只实现简单版本的共享锁,这种有可能死锁*******
     * 假设共享锁持有的permit许可,有5张
     * 加锁: 先生成临时顺序节点,然后获得basePath下的所有孩子节点。从小到大排序
     * ourlockPath 先获得。 然后previous5Index = ourLockPathIndex -5 < 0 说明是获得锁,
     * 如果大于等于0 ,说明是没获得,等待。注册监听器,监听前面第5个节点,dataChange的时候,说明节点删除,
     */
    public class ZKDistributeSemaphore {
    
        private final ZkClient client;
        private final String path;
        private final String basePath;
        private final String lockName;
        private String ourLockPath;
        private  int permits =5;
    
        /** 重试获取锁次数 */
        private static final Integer MAX_RETRY_COUNT = 10;
        private static final String LOCK_NAME = "lock-";
    
        public ZKDistributeSemaphore(ZkClient client, String basePath,int permits) {
            this.client = client;
            this.basePath = basePath;
            this.path = basePath.concat("/").concat(LOCK_NAME);
            this.lockName = LOCK_NAME;
            this.permits = permits;
        }
    
        public void getLock() throws Exception {
            // -1 表示永不超时
            ourLockPath = tryGetLock(-1, null);
            if(ourLockPath == null){
                throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
            }
        }
    
        public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
            ourLockPath = tryGetLock(timeOut, timeUnit);
            return ourLockPath != null;
        }
    
        public void releaseLock() throws Exception {
            releaseLock(ourLockPath);
        }
    
        /**
         * 等待获取锁
         * @param startMillis
         * @param millisToWait
         * @param ourPath
         * @return
         * @throws Exception
         */
        private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {
    
            // 是否得到锁
            boolean haveTheLock = false;
            // 是否需要删除当前锁的节点
            boolean doDeleteOurPath = false;
    
            try {
    
                while (!haveTheLock) {
    
                    // 获取所有锁节点(/locker下的子节点)并排序(从小到大)
                    List<String> children = getSortedChildren();
    
                    // 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
                    String sequenceNodeName = ourPath.substring(basePath.length() + 1);
    
                    // 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
                    int ourIndex = children.indexOf(sequenceNodeName);
                    if (ourIndex < 0) {
                        // 可能网络闪断 抛给上层处理
                        throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                    }
    
                    boolean isGetTheLock = (ourIndex-permits<0);
    
                    if (isGetTheLock) {
                        // 如果第一位 已经获得锁
                        haveTheLock = true;
                    } else {
                        // 如果没有拿到锁,就监听前面第5个节点
                        String pathToWatch = children.get(ourIndex - permits);
                        String previousSequencePath = basePath.concat("/").concat(pathToWatch);
                        final CountDownLatch latch = new CountDownLatch(1);
                        final IZkDataListener previousListener = new IZkDataListener() {
    
                            public void handleDataDeleted(String dataPath) throws Exception {
                                latch.countDown();
                            }
    
                            public void handleDataChange(String dataPath, Object data) throws Exception {
                            }
                        };
    
                        try {
                            client.subscribeDataChanges(previousSequencePath, previousListener);
    
                            if (millisToWait != null) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait <= 0) {
                                    doDeleteOurPath = true;
                                    break;
                                }
    
                                latch.await(millisToWait, TimeUnit.MICROSECONDS);
                            } else {
                                latch.await();
                            }
                        } catch (ZkNoNodeException e) {
                            e.printStackTrace();
                        } finally {
                            client.unsubscribeDataChanges(previousSequencePath, previousListener);
                        }
    
                    }
                }
            } catch (Exception e) {
                //发生异常需要删除节点
                doDeleteOurPath = true;
                throw e;
            } finally {
                //如果需要删除节点
                if (doDeleteOurPath) {
                    deleteOurPath(ourPath);
                }
            }
    
            return haveTheLock;
        }
    
        /**
         * 获取所有锁节点(/locker下的子节点)并排序
         *
         * @return
         * @throws Exception
         */
        private List<String> getSortedChildren() throws Exception {
            try {
    
                List<String> children = client.getChildren(basePath);
                Collections.sort
                        (
                                children,
                                new Comparator<String>() {
                                    public int compare(String lhs, String rhs) {
                                        return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
                                    }
                                }
                        );
                return children;
    
            } catch (ZkNoNodeException e) {
                client.createPersistent(basePath, true);
                return getSortedChildren();
    
            }
        }
    
        protected void releaseLock(String lockPath) throws Exception {
            deleteOurPath(lockPath);
        }
    
        /**
         * 尝试获取锁
         * @param timeOut
         * @param timeUnit
         * @return 锁节点的路径没有获取到锁返回null
         * @throws Exception
         */
        protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {
    
            long startMillis = System.currentTimeMillis();
            Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;
    
            String ourPath = null;
            boolean hasTheLock = false;
            boolean isDone = false;
            int retryCount = 0;
    
            //网络闪断需要重试一试
            while (!isDone) {
                isDone = true;
    
                try {
                    // 在/locker下创建临时的顺序节点
                    ourPath = createLockNode(client, path);
    
                    // 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
                    hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
                } catch (ZkNoNodeException e) {
                    if (retryCount++ < MAX_RETRY_COUNT) {
                        isDone = false;
                    } else {
                        throw e;
                    }
                }
            }
    
            if (hasTheLock) {
                return ourPath;
            }
    
            return null;
        }
    
        private void deleteOurPath(String ourPath) throws Exception {
            client.delete(ourPath);
        }
    
        private String createLockNode(ZkClient client, String path) throws Exception {
            // 创建临时循序节点
            return client.createEphemeralSequential(path, null);
        }
    
        private String getLockNodeNumber(String str, String lockName) {
            int index = str.lastIndexOf(lockName);
            if (index >= 0) {
                index += lockName.length();
                return index <= str.length() ? str.substring(index) : "";
            }
            return str;
        }
    }
    

    这个没有测试过。

    相关文章

      网友评论

          本文标题:分布式环境下的独占锁,共享锁

          本文链接:https://www.haomeiwen.com/subject/tfkauqtx.html