zookeeper
- 节点特性
1.同级节点的唯一性
2.临时节点和持久节点
3.有序节点和无序节点
4.临时节点下不能存在子节点 - 节点详情
cZxid 创建节点时的事务ID
ctime 创建节点时的时间
mZxid 最后修改节点时的事务ID
mtime 最后修改节点时的时间
pZxid 表示该节点的子节点列表最后一次修改的事务ID,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该ID
cversion 子节点版本号,子节点每次修改版本号加1
dataversion 数据版本号,数据每次修改该版本号加1
aclversion 权限版本号,权限每次修改该版本号加1
ephemeralOwner 如果是临时节点则内容为绑定的当前sessionID,否则为0
dataLength 该节点的数据长度
numChildren 该节点拥有子节点的数量 - 读写操作
读:可以在任意节点去读取数据
写:将请求转发到leader上处理
- 分布式锁
package com.simon.lock.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @Author: lim
* @Description:
* @Date: 2018/11/23.
*/
public class ZookeeperLockUtil implements Watcher{
/**
* zookeeper集群地址
*/
private static final String zookeeperConfig = "192.168.33.200:2181,192.168.33.201:2181,192.168.33.203:2181";
/**
* 持久根节点
*/
private String LOCK_ROOT= "/LockRoot";
/**
* 竞争的资源
*/
private String lockName;
/**
* 当前节点
*/
private String CURRENT_LOCK;
/**
* 等待的节点
*/
private String WAIT_LOCK;
/**
* 等待超时时间
*/
private Long waitTimeout = 5000L;
private CountDownLatch countDownLatch;
/**
* 保证连接后,再去尝试创建节点
*/
private CountDownLatch createCountDownLatch = new CountDownLatch(1);
private ZooKeeper zooKeeper;
/**
* 初始化根节点
* @param lockName 争夺的锁资源
*/
public ZookeeperLockUtil(String lockName) {
this.lockName = lockName;
try {
zooKeeper = new ZooKeeper(zookeeperConfig,50000,this);
createCountDownLatch.await();
Stat stat = zooKeeper.exists(LOCK_ROOT, this);
if(stat == null){
zooKeeper.create(LOCK_ROOT,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 锁
* @return
*/
public boolean lock(){
if(tryLock()){
return true;
}
return waitForLock(WAIT_LOCK,waitTimeout);
}
/**
* 等待锁
* @param wait_lock
* @param waitTimeout
* @return
*/
private boolean waitForLock(String wait_lock, Long waitTimeout) {
try {
Stat stat = zooKeeper.exists(LOCK_ROOT + "/" + wait_lock, true);
if(stat != null){
countDownLatch = new CountDownLatch(1);
countDownLatch.await(waitTimeout, TimeUnit.MILLISECONDS);
countDownLatch = null;
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 尝试获取锁
* @return
*/
public boolean tryLock(){
try {
//创建临时有序节点
CURRENT_LOCK = zooKeeper.create(LOCK_ROOT + "/" + lockName,new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
//System.out.println(Thread.currentThread().getName()+"创建节点"+CURRENT_LOCK);
//判断当前节点是否为第一个节点
List<String> childs = zooKeeper.getChildren(LOCK_ROOT, false);
//System.out.println(Thread.currentThread().getName()+"获取子节点个数"+childs.size());
List<String> sortLocks = new ArrayList<>();
for(String child : childs){
String nodePrefix = child.substring(0,lockName.length());
if(lockName.equals(nodePrefix)){
sortLocks.add(child);
}
}
Collections.sort(sortLocks);
//System.out.println(Thread.currentThread().getName()+"获取锁"+CURRENT_LOCK);
//如果是则获取锁成功
if(CURRENT_LOCK.equals(LOCK_ROOT + "/" + sortLocks.get(0))){
return true;
}
//如果不是,则获取当前节点的前一个
String preNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
WAIT_LOCK = sortLocks.get(Collections.binarySearch(sortLocks,preNode) - 1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 释放锁
* @return
*/
public boolean releaseLock(){
try {
zooKeeper.delete(CURRENT_LOCK,-1);
CURRENT_LOCK = null;
zooKeeper.close();
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
/**
* 监听事件
* @param watchedEvent
*/
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
createCountDownLatch.countDown();
}
if(countDownLatch != null){
countDownLatch.countDown();
}
}
}
测试
package com.simon;
import com.simon.lock.zookeeper.ZookeeperLockUtil;
/**
* @Author: lim
* @Description:
* @Date: 2018/11/23.
*/
public class ZookeeperLockTest {
static int counter = 10;
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
ZookeeperLockUtil zookeeperLockUtil = null;
try {
zookeeperLockUtil = new ZookeeperLockUtil("demo");
if(zookeeperLockUtil.lock()){
counter--;
System.out.println(Thread.currentThread().getName()+"=="+counter);
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(zookeeperLockUtil != null){
zookeeperLockUtil.releaseLock();
}
}
}
};
for(int i = 0;i < 10;i++){
Thread t = new Thread(runnable);
t.start();
}
}
}
网友评论