基于zookeeper分布式环境下的锁,原理大概是:zookeeper的临时顺序节点。当session结束的时候,临时节点自动删除。发送watchEvent通知给客户端。然后占据临时顺序节点,索引为0的地方,这个进程就占有锁。其他的等待。当锁释放的时候,节点删除。后面一个变成锁的拥有者。主要是利用了zookeeper的临时顺序节点的原子递增性。基于zookeeper的分布式唯一id生成器,也是这个原理。
基于zookeeper的分布式独占锁
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 基于zookeeper的分布式独占锁
*/
public class ZKDistributeLock {
private final ZkClient client;
private final String path;
private final String basePath;
private final String lockName;
private String ourLockPath;
/** 重试获取锁次数 */
private static final Integer MAX_RETRY_COUNT = 10;
private static final String LOCK_NAME = "lock-";
public ZKDistributeLock(ZkClient client, String basePath) {
this.client = client;
this.basePath = basePath;
this.path = basePath.concat("/").concat(LOCK_NAME);
this.lockName = LOCK_NAME;
}
public void getLock() throws Exception {
// -1 表示永不超时
ourLockPath = tryGetLock(-1, null);
if(ourLockPath == null){
throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
}
}
public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
ourLockPath = tryGetLock(timeOut, timeUnit);
return ourLockPath != null;
}
public void releaseLock() throws Exception {
releaseLock(ourLockPath);
}
/**
* 等待获取锁
* @param startMillis
* @param millisToWait
* @param ourPath
* @return
* @throws Exception
*/
private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {
// 是否得到锁
boolean haveTheLock = false;
// 是否需要删除当前锁的节点
boolean doDeleteOurPath = false;
try {
while (!haveTheLock) {
// 获取所有锁节点(/locker下的子节点)并排序(从小到大)
List<String> children = getSortedChildren();
// 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
int ourIndex = children.indexOf(sequenceNodeName);
if (ourIndex < 0) {
// 可能网络闪断 抛给上层处理
throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
}
boolean isGetTheLock = (ourIndex == 0);
if (isGetTheLock) {
// 如果第一位 已经获得锁
haveTheLock = true;
} else {
// 如果不是第一位,监听比自己小的那个节点的删除事件
String pathToWatch = children.get(ourIndex - 1);
String previousSequencePath = basePath.concat("/").concat(pathToWatch);
final CountDownLatch latch = new CountDownLatch(1);
final IZkDataListener previousListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
latch.countDown();
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
try {
client.subscribeDataChanges(previousSequencePath, previousListener);
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDeleteOurPath = true;
break;
}
latch.await(millisToWait, TimeUnit.MICROSECONDS);
} else {
latch.await();
}
} catch (ZkNoNodeException e) {
e.printStackTrace();
} finally {
client.unsubscribeDataChanges(previousSequencePath, previousListener);
}
}
}
} catch (Exception e) {
//发生异常需要删除节点
doDeleteOurPath = true;
throw e;
} finally {
//如果需要删除节点
if (doDeleteOurPath) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
/**
* 获取所有锁节点(/locker下的子节点)并排序
*
* @return
* @throws Exception
*/
private List<String> getSortedChildren() throws Exception {
try {
List<String> children = client.getChildren(basePath);
Collections.sort
(
children,
new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
}
}
);
return children;
} catch (ZkNoNodeException e) {
client.createPersistent(basePath, true);
return getSortedChildren();
}
}
protected void releaseLock(String lockPath) throws Exception {
deleteOurPath(lockPath);
}
/**
* 尝试获取锁
* @param timeOut
* @param timeUnit
* @return 锁节点的路径没有获取到锁返回null
* @throws Exception
*/
protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
int retryCount = 0;
//网络闪断需要重试一试
while (!isDone) {
isDone = true;
try {
// 在/locker下创建临时的顺序节点
ourPath = createLockNode(client, path);
// 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
} catch (ZkNoNodeException e) {
if (retryCount++ < MAX_RETRY_COUNT) {
isDone = false;
} else {
throw e;
}
}
}
if (hasTheLock) {
return ourPath;
}
return null;
}
private void deleteOurPath(String ourPath) throws Exception {
client.delete(ourPath);
}
private String createLockNode(ZkClient client, String path) throws Exception {
// 创建临时循序节点
return client.createEphemeralSequential(path, null);
}
private String getLockNodeNumber(String str, String lockName) {
int index = str.lastIndexOf(lockName);
if (index >= 0) {
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
}
pom依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<!-- ZkClient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
</dependency>
测试:
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
public class LockTest {
public static final String ZK_CONNECT_STRING="47.99.45.243:2181";
public static void main(String[] args) {
// 需要手动创建节点 /locker
ZkClient zkClient1 = new ZkClient(ZK_CONNECT_STRING, 5000,
5000, new BytesPushThroughSerializer());
ZKDistributeLock lock1 = new ZKDistributeLock(zkClient1, "/locker");
ZkClient zkClient2 = new ZkClient(ZK_CONNECT_STRING, 5000,
5000, new BytesPushThroughSerializer());
final ZKDistributeLock lock2 = new ZKDistributeLock(zkClient2, "/locker");
try {
lock1.getLock();
System.out.println("Client1 is get lock!");
Thread client2Thd = new Thread(new Runnable() {
public void run() {
try {
lock2.getLock();
// lock2.getLock(500, TimeUnit.SECONDS);
System.out.println("Client2 is get lock");
lock2.releaseLock();
System.out.println("Client2 is released lock");
} catch (Exception e) {
e.printStackTrace();
}
}
});
client2Thd.start();
// 5s 后lock1释放锁
Thread.sleep(5000);
lock1.releaseLock();
System.out.println("Client1 is released lock");
client2Thd.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
输出结果:
Client1 is get lock!
Client1 is released lock
Client2 is get lock
Client2 is released lock
基于zookeeper的分布式共享锁:
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 基于zookeeper的分布式共享锁 信号量(有缺陷,因为实际上应该是前面5个,任意一个节点被删除,都应该受到watchEvent通知,
* 但是这样的话,代码难以控制,
* ******也可以超时后(这个节点一直任务比较长,一直不释放锁,一直不释放锁的话,前面的已经释放,最终他会成为
* index=0的节点),往后面寻找),但是这里暂时不实现,只实现简单版本的共享锁,这种有可能死锁*******
* 假设共享锁持有的permit许可,有5张
* 加锁: 先生成临时顺序节点,然后获得basePath下的所有孩子节点。从小到大排序
* ourlockPath 先获得。 然后previous5Index = ourLockPathIndex -5 < 0 说明是获得锁,
* 如果大于等于0 ,说明是没获得,等待。注册监听器,监听前面第5个节点,dataChange的时候,说明节点删除,
*/
public class ZKDistributeSemaphore {
private final ZkClient client;
private final String path;
private final String basePath;
private final String lockName;
private String ourLockPath;
private int permits =5;
/** 重试获取锁次数 */
private static final Integer MAX_RETRY_COUNT = 10;
private static final String LOCK_NAME = "lock-";
public ZKDistributeSemaphore(ZkClient client, String basePath,int permits) {
this.client = client;
this.basePath = basePath;
this.path = basePath.concat("/").concat(LOCK_NAME);
this.lockName = LOCK_NAME;
this.permits = permits;
}
public void getLock() throws Exception {
// -1 表示永不超时
ourLockPath = tryGetLock(-1, null);
if(ourLockPath == null){
throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
}
}
public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
ourLockPath = tryGetLock(timeOut, timeUnit);
return ourLockPath != null;
}
public void releaseLock() throws Exception {
releaseLock(ourLockPath);
}
/**
* 等待获取锁
* @param startMillis
* @param millisToWait
* @param ourPath
* @return
* @throws Exception
*/
private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {
// 是否得到锁
boolean haveTheLock = false;
// 是否需要删除当前锁的节点
boolean doDeleteOurPath = false;
try {
while (!haveTheLock) {
// 获取所有锁节点(/locker下的子节点)并排序(从小到大)
List<String> children = getSortedChildren();
// 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
int ourIndex = children.indexOf(sequenceNodeName);
if (ourIndex < 0) {
// 可能网络闪断 抛给上层处理
throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
}
boolean isGetTheLock = (ourIndex-permits<0);
if (isGetTheLock) {
// 如果第一位 已经获得锁
haveTheLock = true;
} else {
// 如果没有拿到锁,就监听前面第5个节点
String pathToWatch = children.get(ourIndex - permits);
String previousSequencePath = basePath.concat("/").concat(pathToWatch);
final CountDownLatch latch = new CountDownLatch(1);
final IZkDataListener previousListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
latch.countDown();
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
try {
client.subscribeDataChanges(previousSequencePath, previousListener);
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDeleteOurPath = true;
break;
}
latch.await(millisToWait, TimeUnit.MICROSECONDS);
} else {
latch.await();
}
} catch (ZkNoNodeException e) {
e.printStackTrace();
} finally {
client.unsubscribeDataChanges(previousSequencePath, previousListener);
}
}
}
} catch (Exception e) {
//发生异常需要删除节点
doDeleteOurPath = true;
throw e;
} finally {
//如果需要删除节点
if (doDeleteOurPath) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
/**
* 获取所有锁节点(/locker下的子节点)并排序
*
* @return
* @throws Exception
*/
private List<String> getSortedChildren() throws Exception {
try {
List<String> children = client.getChildren(basePath);
Collections.sort
(
children,
new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
}
}
);
return children;
} catch (ZkNoNodeException e) {
client.createPersistent(basePath, true);
return getSortedChildren();
}
}
protected void releaseLock(String lockPath) throws Exception {
deleteOurPath(lockPath);
}
/**
* 尝试获取锁
* @param timeOut
* @param timeUnit
* @return 锁节点的路径没有获取到锁返回null
* @throws Exception
*/
protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
int retryCount = 0;
//网络闪断需要重试一试
while (!isDone) {
isDone = true;
try {
// 在/locker下创建临时的顺序节点
ourPath = createLockNode(client, path);
// 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
} catch (ZkNoNodeException e) {
if (retryCount++ < MAX_RETRY_COUNT) {
isDone = false;
} else {
throw e;
}
}
}
if (hasTheLock) {
return ourPath;
}
return null;
}
private void deleteOurPath(String ourPath) throws Exception {
client.delete(ourPath);
}
private String createLockNode(ZkClient client, String path) throws Exception {
// 创建临时循序节点
return client.createEphemeralSequential(path, null);
}
private String getLockNodeNumber(String str, String lockName) {
int index = str.lastIndexOf(lockName);
if (index >= 0) {
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
}
这个没有测试过。
网友评论