分布式情况下, 比较难的一个问题是如何保证数据的最终唯一. 在单进程情况下, 多线程操作数据, 我们只需要保证代码同步即可, 同一时间只允许一个线程来操作数据, 通过 Java 提供的并发 API 我们可以解决. 但是在分布式情况下, 无法使用此方法来解决, 因为在分布式情况下, 我们面对的不是多线程, 而是多进程, 也许多个服务都不在同一个物理机器上运行, 进程与进程之间无法互相同步数据.
同一个进程中使用锁之所以可以解决多线程数据同步的问题, 是因为我们可以在进程中设置一个标记, 而这个标记所有的线程都可以访问的到.
但是在多进程中, 无法在A进程中设置一个标记让其他的不同进程里的线程都能访问的到. 所以也就无法实现数据唯一.
那么问题来了, 只要有一个可以让所有的进程都能访问到的数据, 就可以实现数据同步, 数据唯一.
zk分布式锁原理
通过临时节点, 临时节点特征:会话连接结束后, 节点会自动删除
1.多个服务器在zk上创建同一个临时节点, 临时节点不允许重复
2.谁能成功创建临时节点, 谁就拿到锁, 其他服务没有在zk上创建临时节点成功, 处于等待状态.
3.通过事件通知, 监听到节点被删除之后, 其他服务又开始抢着去创建临时节点, 创建成功, 便获取锁.
创建连接获取锁, 关闭连接释放锁.
zookeeper就是那个所有的进程线程都可以访问到的数据, 由zookeeper来统一调度管理分布式锁.
引入pom依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
定义锁服务
//##lock 锁 定义分布式锁
public interface Lock {
//获取锁
void getLock();
//釋放鎖
void unLock();
}
定义一个抽象类ZookeeperAbstractLock实现Lock接口, 该类主要是进行获取锁服务和解锁
//抽象类是类的一种, 抽象类不能实例化对象
//重构重复代码,将重复代码交给子类执行
public abstract class ZookeeperAbstractLock implements Lock {
// zk连接地址
private static final String CONNECTSTRING = "127.0.0.1:2181";
// 创建zk连接
protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
protected static final String PATH = "/lock";
protected CountDownLatch countDownLatch = null;
public void getLock() {
if (tryLock()) {
System.out.println("###获取锁成功#####");
} else {
// 等待
waitLock();
// 重新获取锁
getLock();
}
}
// 是否获取锁成功,成功返回true 失败返回fasle
abstract Boolean tryLock();
// 等待
abstract void waitLock();
public void unLock() {
if (zkClient != null) {
zkClient.close();
System.out.println("释放锁资源, 关闭连接");
System.out.println();
}
}
}
ZookeeperDistrbuteLock主要是进项节点创建, 节点监听, 节点删除
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
@Override
Boolean tryLock() {
try {
zkClient.createEphemeral(PATH);
return true;
} catch (Exception e) {
return false;
}
}
@Override
void waitLock() {
// 使用事件监听,获取到节点被删除,
IZkDataListener iZkDataListener = new IZkDataListener() {
// 当节点被删除
public void handleDataDeleted(String dataPath) throws Exception {
if (countDownLatch != null) {
// 唤醒
countDownLatch.countDown();
}
}
// 当节点发生改变
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 注册节点信息
zkClient.subscribeDataChanges(PATH, iZkDataListener);
if (zkClient.exists(PATH)) {
// 创建信号量
countDownLatch = new CountDownLatch(1);
try {
// 等待
countDownLatch.await();
} catch (Exception e) {
}
}
// 删除事件通知, 当await时, 代码不会继续往下走, 当countDown时, 代码才会继续往下走
zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
}
}
由于条件限制, 只进行多线程测试
public class Service implements Runnable {
private Lock lock = new ZookeeperDistrbuteLock();
public void run() {
// 这里上锁, 假如
try {
// 上锁
lock.getLock();
// 模拟用户生成订单号
getNumber();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 釋放鎖資源
lock.unLock();
}
}
public String getNumber() {
try {
Thread.sleep(200);
} catch (Exception e) {
// TODO: handle exception
}
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
return simpt.format(new Date());
}
public static void main(String[] args) {
System.out.println("##模拟生成订单号开始...");
/**
* 此处不能使用创建单个service, 这样的话所有的service对象使用同一个zk客户端连接zk,
* 当某一个对象关闭了连接之后, zk客户端与zk直间的连接就被关闭了, 就不能被连接了.
* zk是只有一个, 连接zk的客户端却有好几个
* java.lang.IllegalStateException: ZkClient already closed!
* Service service = new Service();
* new Thread(service).start();
*/
for (int i = 0; i < 100; i++) {
new Thread(new Service()).start();
}
}
}
网友评论