美文网首页
分布式环境下的独占锁,共享锁

分布式环境下的独占锁,共享锁

作者: sadamu0912 | 来源:发表于2019-03-05 20:50 被阅读0次

基于zookeeper分布式环境下的锁,原理大概是:zookeeper的临时顺序节点。当session结束的时候,临时节点自动删除。发送watchEvent通知给客户端。然后占据临时顺序节点,索引为0的地方,这个进程就占有锁。其他的等待。当锁释放的时候,节点删除。后面一个变成锁的拥有者。主要是利用了zookeeper的临时顺序节点的原子递增性。基于zookeeper的分布式唯一id生成器,也是这个原理。

基于zookeeper的分布式独占锁

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 基于zookeeper的分布式独占锁
 */
public class ZKDistributeLock {

    private final ZkClient client;
    private final String path;
    private final String basePath;
    private final String lockName;
    private String ourLockPath;

    /** 重试获取锁次数 */
    private static final Integer MAX_RETRY_COUNT = 10;
    private static final String LOCK_NAME = "lock-";

    public ZKDistributeLock(ZkClient client, String basePath) {
        this.client = client;
        this.basePath = basePath;
        this.path = basePath.concat("/").concat(LOCK_NAME);
        this.lockName = LOCK_NAME;
    }

    public void getLock() throws Exception {
        // -1 表示永不超时
        ourLockPath = tryGetLock(-1, null);
        if(ourLockPath == null){
            throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
        }
    }

    public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
        ourLockPath = tryGetLock(timeOut, timeUnit);
        return ourLockPath != null;
    }

    public void releaseLock() throws Exception {
        releaseLock(ourLockPath);
    }

    /**
     * 等待获取锁
     * @param startMillis
     * @param millisToWait
     * @param ourPath
     * @return
     * @throws Exception
     */
    private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {

        // 是否得到锁
        boolean haveTheLock = false;
        // 是否需要删除当前锁的节点
        boolean doDeleteOurPath = false;

        try {

            while (!haveTheLock) {

                // 获取所有锁节点(/locker下的子节点)并排序(从小到大)
                List<String> children = getSortedChildren();

                // 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
                String sequenceNodeName = ourPath.substring(basePath.length() + 1);

                // 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
                int ourIndex = children.indexOf(sequenceNodeName);
                if (ourIndex < 0) {
                    // 可能网络闪断 抛给上层处理
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }

                boolean isGetTheLock = (ourIndex == 0);

                if (isGetTheLock) {
                    // 如果第一位 已经获得锁
                    haveTheLock = true;
                } else {
                    // 如果不是第一位,监听比自己小的那个节点的删除事件
                    String pathToWatch = children.get(ourIndex - 1);
                    String previousSequencePath = basePath.concat("/").concat(pathToWatch);
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {

                        public void handleDataDeleted(String dataPath) throws Exception {
                            latch.countDown();
                        }

                        public void handleDataChange(String dataPath, Object data) throws Exception {
                        }
                    };

                    try {
                        client.subscribeDataChanges(previousSequencePath, previousListener);

                        if (millisToWait != null) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if (millisToWait <= 0) {
                                doDeleteOurPath = true;
                                break;
                            }

                            latch.await(millisToWait, TimeUnit.MICROSECONDS);
                        } else {
                            latch.await();
                        }
                    } catch (ZkNoNodeException e) {
                        e.printStackTrace();
                    } finally {
                        client.unsubscribeDataChanges(previousSequencePath, previousListener);
                    }

                }
            }
        } catch (Exception e) {
            //发生异常需要删除节点
            doDeleteOurPath = true;
            throw e;
        } finally {
            //如果需要删除节点
            if (doDeleteOurPath) {
                deleteOurPath(ourPath);
            }
        }

        return haveTheLock;
    }

    /**
     * 获取所有锁节点(/locker下的子节点)并排序
     *
     * @return
     * @throws Exception
     */
    private List<String> getSortedChildren() throws Exception {
        try {

            List<String> children = client.getChildren(basePath);
            Collections.sort
                    (
                            children,
                            new Comparator<String>() {
                                public int compare(String lhs, String rhs) {
                                    return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
                                }
                            }
                    );
            return children;

        } catch (ZkNoNodeException e) {
            client.createPersistent(basePath, true);
            return getSortedChildren();

        }
    }

    protected void releaseLock(String lockPath) throws Exception {
        deleteOurPath(lockPath);
    }

    /**
     * 尝试获取锁
     * @param timeOut
     * @param timeUnit
     * @return 锁节点的路径没有获取到锁返回null
     * @throws Exception
     */
    protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {

        long startMillis = System.currentTimeMillis();
        Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;

        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;
        int retryCount = 0;

        //网络闪断需要重试一试
        while (!isDone) {
            isDone = true;

            try {
                // 在/locker下创建临时的顺序节点
                ourPath = createLockNode(client, path);

                // 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
                hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
            } catch (ZkNoNodeException e) {
                if (retryCount++ < MAX_RETRY_COUNT) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }

        if (hasTheLock) {
            return ourPath;
        }

        return null;
    }

    private void deleteOurPath(String ourPath) throws Exception {
        client.delete(ourPath);
    }

    private String createLockNode(ZkClient client, String path) throws Exception {
        // 创建临时循序节点
        return client.createEphemeralSequential(path, null);
    }

    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }
}

pom依赖:

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>

        <!-- ZkClient -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.9</version>
        </dependency>

测试:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;

public class LockTest {
    public static final String ZK_CONNECT_STRING="47.99.45.243:2181";

    public static void main(String[] args) {

        // 需要手动创建节点 /locker

        ZkClient zkClient1 = new ZkClient(ZK_CONNECT_STRING, 5000,
                5000, new BytesPushThroughSerializer());
        ZKDistributeLock lock1 = new ZKDistributeLock(zkClient1, "/locker");

        ZkClient zkClient2 = new ZkClient(ZK_CONNECT_STRING, 5000,
                5000, new BytesPushThroughSerializer());
        final ZKDistributeLock lock2 = new ZKDistributeLock(zkClient2, "/locker");

        try {
            lock1.getLock();
            System.out.println("Client1 is get lock!");
            Thread client2Thd = new Thread(new Runnable() {

                public void run() {
                    try {
                        lock2.getLock();
//                        lock2.getLock(500, TimeUnit.SECONDS);
                        System.out.println("Client2 is get lock");
                        lock2.releaseLock();
                        System.out.println("Client2 is released lock");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            client2Thd.start();

            // 5s 后lock1释放锁
            Thread.sleep(5000);
            lock1.releaseLock();
            System.out.println("Client1 is released lock");

            client2Thd.join();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

输出结果:

Client1 is get lock!
Client1 is released lock
Client2 is get lock
Client2 is released lock

基于zookeeper的分布式共享锁:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 基于zookeeper的分布式共享锁 信号量(有缺陷,因为实际上应该是前面5个,任意一个节点被删除,都应该受到watchEvent通知,
 * 但是这样的话,代码难以控制,
 * ******也可以超时后(这个节点一直任务比较长,一直不释放锁,一直不释放锁的话,前面的已经释放,最终他会成为
 * index=0的节点),往后面寻找),但是这里暂时不实现,只实现简单版本的共享锁,这种有可能死锁*******
 * 假设共享锁持有的permit许可,有5张
 * 加锁: 先生成临时顺序节点,然后获得basePath下的所有孩子节点。从小到大排序
 * ourlockPath 先获得。 然后previous5Index = ourLockPathIndex -5 < 0 说明是获得锁,
 * 如果大于等于0 ,说明是没获得,等待。注册监听器,监听前面第5个节点,dataChange的时候,说明节点删除,
 */
public class ZKDistributeSemaphore {

    private final ZkClient client;
    private final String path;
    private final String basePath;
    private final String lockName;
    private String ourLockPath;
    private  int permits =5;

    /** 重试获取锁次数 */
    private static final Integer MAX_RETRY_COUNT = 10;
    private static final String LOCK_NAME = "lock-";

    public ZKDistributeSemaphore(ZkClient client, String basePath,int permits) {
        this.client = client;
        this.basePath = basePath;
        this.path = basePath.concat("/").concat(LOCK_NAME);
        this.lockName = LOCK_NAME;
        this.permits = permits;
    }

    public void getLock() throws Exception {
        // -1 表示永不超时
        ourLockPath = tryGetLock(-1, null);
        if(ourLockPath == null){
            throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
        }
    }

    public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
        ourLockPath = tryGetLock(timeOut, timeUnit);
        return ourLockPath != null;
    }

    public void releaseLock() throws Exception {
        releaseLock(ourLockPath);
    }

    /**
     * 等待获取锁
     * @param startMillis
     * @param millisToWait
     * @param ourPath
     * @return
     * @throws Exception
     */
    private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {

        // 是否得到锁
        boolean haveTheLock = false;
        // 是否需要删除当前锁的节点
        boolean doDeleteOurPath = false;

        try {

            while (!haveTheLock) {

                // 获取所有锁节点(/locker下的子节点)并排序(从小到大)
                List<String> children = getSortedChildren();

                // 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
                String sequenceNodeName = ourPath.substring(basePath.length() + 1);

                // 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
                int ourIndex = children.indexOf(sequenceNodeName);
                if (ourIndex < 0) {
                    // 可能网络闪断 抛给上层处理
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }

                boolean isGetTheLock = (ourIndex-permits<0);

                if (isGetTheLock) {
                    // 如果第一位 已经获得锁
                    haveTheLock = true;
                } else {
                    // 如果没有拿到锁,就监听前面第5个节点
                    String pathToWatch = children.get(ourIndex - permits);
                    String previousSequencePath = basePath.concat("/").concat(pathToWatch);
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {

                        public void handleDataDeleted(String dataPath) throws Exception {
                            latch.countDown();
                        }

                        public void handleDataChange(String dataPath, Object data) throws Exception {
                        }
                    };

                    try {
                        client.subscribeDataChanges(previousSequencePath, previousListener);

                        if (millisToWait != null) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if (millisToWait <= 0) {
                                doDeleteOurPath = true;
                                break;
                            }

                            latch.await(millisToWait, TimeUnit.MICROSECONDS);
                        } else {
                            latch.await();
                        }
                    } catch (ZkNoNodeException e) {
                        e.printStackTrace();
                    } finally {
                        client.unsubscribeDataChanges(previousSequencePath, previousListener);
                    }

                }
            }
        } catch (Exception e) {
            //发生异常需要删除节点
            doDeleteOurPath = true;
            throw e;
        } finally {
            //如果需要删除节点
            if (doDeleteOurPath) {
                deleteOurPath(ourPath);
            }
        }

        return haveTheLock;
    }

    /**
     * 获取所有锁节点(/locker下的子节点)并排序
     *
     * @return
     * @throws Exception
     */
    private List<String> getSortedChildren() throws Exception {
        try {

            List<String> children = client.getChildren(basePath);
            Collections.sort
                    (
                            children,
                            new Comparator<String>() {
                                public int compare(String lhs, String rhs) {
                                    return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
                                }
                            }
                    );
            return children;

        } catch (ZkNoNodeException e) {
            client.createPersistent(basePath, true);
            return getSortedChildren();

        }
    }

    protected void releaseLock(String lockPath) throws Exception {
        deleteOurPath(lockPath);
    }

    /**
     * 尝试获取锁
     * @param timeOut
     * @param timeUnit
     * @return 锁节点的路径没有获取到锁返回null
     * @throws Exception
     */
    protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {

        long startMillis = System.currentTimeMillis();
        Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;

        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;
        int retryCount = 0;

        //网络闪断需要重试一试
        while (!isDone) {
            isDone = true;

            try {
                // 在/locker下创建临时的顺序节点
                ourPath = createLockNode(client, path);

                // 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
                hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
            } catch (ZkNoNodeException e) {
                if (retryCount++ < MAX_RETRY_COUNT) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }

        if (hasTheLock) {
            return ourPath;
        }

        return null;
    }

    private void deleteOurPath(String ourPath) throws Exception {
        client.delete(ourPath);
    }

    private String createLockNode(ZkClient client, String path) throws Exception {
        // 创建临时循序节点
        return client.createEphemeralSequential(path, null);
    }

    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }
}

这个没有测试过。

相关文章

网友评论

      本文标题:分布式环境下的独占锁,共享锁

      本文链接:https://www.haomeiwen.com/subject/tfkauqtx.html