工作需要。写了一个基于ZK的分布式锁,记录一下:
原理
zk能保证集群上的路径同一时刻只有一个客户端来创建。因此,通过在集群上顺序创建和删除临时路径,在实现分布式锁的获取和释放。
代码
zk上有一个客户端框架Curator
已经对分布式互斥锁进行了封装,几乎是开箱即用:
- 封装一下框架的初始化
public class ZKCuratorManager {
private static InterProcessMutex lock;
private static CuratorFramework cf;
private static String zkAddr = "*.*.*.*:2181";
private static String lockPath = "/distribute-lock";
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
cf = CuratorFrameworkFactory.builder()
.connectString(zkAddr)
.sessionTimeoutMs(2000)
.retryPolicy(retryPolicy)
.build();
cf.start();
}
public static InterProcessMutex getLock() {
lock = new InterProcessMutex(cf, lockPath);
return lock;
}
}
- 封装一下工具类
public class ZKCuratorLockUtil {
/**
* 从配置类中获取分布式锁对象
*/
private static InterProcessMutex lock = ZKCuratorManager.getLock();
/**
* 加锁
*
* @return
*/
public static boolean acquire() {
try {
lock.acquire();
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 锁的释放
*/
public static void release() {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
-
测试
- 所有线程启动后sleep5秒钟
- 用CyclicBarrier使四个线程同时尝试获取锁
- 结果应该是四个线程依次获取-释放锁
public class ZkLockTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for (int i = 0; i < N; i++) {
new WriterTest(barrier).start();
}
System.out.println("END");
}
static class WriterTest extends Thread {
private CyclicBarrier cyclicBarrier;
public WriterTest(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
try {
//以睡眠来模拟写入数据操作
Thread.sleep(5000);
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
//加锁
ZKCuratorLockUtil.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "获得分布式锁");
try {
Thread.sleep(2000);
ZKCuratorLockUtil.release();
System.out.println("线程" + Thread.currentThread().getName() + "释放分布式锁");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("END");
}
}
}
- 测试结果
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
END
线程Thread-2正在写入数据...
线程Thread-3正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
线程Thread-3获得分布式锁
线程Thread-3释放分布式锁
END
线程Thread-1获得分布式锁
线程Thread-1释放分布式锁
END
线程Thread-2获得分布式锁
线程Thread-2释放分布式锁
END
线程Thread-0获得分布式锁
线程Thread-0释放分布式锁
END
网友评论