一、ZooKeeper Curator
1、创建会话
public class Create_Session_Sample {
public static void main(String[] args) throws Exception{
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
/*CuratorFrameworkFactory.newClient("domain1.book.zookeeper:2181",
5000,
3000,
retryPolicy);*/
//Fluent风格
CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")//隔离命名空间/base
.build();
client.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
2、创建节点
使用Curator的同步接口
public class Create_Node_Sample {
static String path = "/zk-book/c1";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();//jianl
client.create()
.creatingParentsIfNeeded()//创建子节点时没有父节点则创建
.withMode(CreateMode.EPHEMERAL)//临时节点
.forPath(path, "init".getBytes());
}
}
注意:zk的非叶子节点都必须为持久节点,因此此处c1为临时节点,而父节点为持久节点
使用Curator的异步接口
public class Create_Node_Background_Sample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
static CountDownLatch semaphore = new CountDownLatch(2);
static ExecutorService tp = Executors.newFixedThreadPool(2);//自定义的Executor
public static void main(String[] args) throws Exception {
client.start();
System.out.println("Main thread: " + Thread.currentThread().getName());
// 此处传入了自定义的Executor
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
System.out.println("Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}, tp).forPath(path, "init".getBytes());
// 此处没有传入自定义的Executor
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
System.out.println("Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}).forPath(path, "init".getBytes());
semaphore.await();//等待异步完成
tp.shutdown();
}
}
注:Zookeeper中所有异步通知事件都会通过默认EventThread线程来处理,而EventThread线程串行处理所有事件。 为了避免执行事件时间过长,影响其他事件执行,可以通过自定义ExecutorService的执行事件。
inBackground(BackgroundCallback arg0, Executor arg1)
3、删除节点
//使用Curator删除节点
public class Del_Data_Sample {
static String path = "/zk-book/c1";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
client.delete().deletingChildrenIfNeeded()
.withVersion(stat.getVersion()).forPath(path);
}
}
storingStatIn 把服务器端获取的状态数据存储到stat对象
deletingChildrenIfNeeded 递归删除子节点
withVersion 按版本号删除
client.delete().guaranteed().forPath(path); 强制删除,只有会话有效,就会持续删除,直到删除。
4、获取节点数据
使用Curator获取数据内容
public class Get_Data_Sample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
Stat stat = new Stat();
System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
}
}
5、更新数据
//使用Curator更新数据内容
public class Set_Data_Sample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
client.delete().deletingChildrenIfNeeded().forPath( path );
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("Success set node for : " + path + ", new version: "
+ client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
try {
client.setData().withVersion(stat.getVersion()).forPath(path);
} catch (Exception e) {
//版本过期,抛出异常KeepErrorCode=BadVersion
System.out.println("Fail set node due to " + e.getMessage());
}
}
}
6、事件监听
Curator引入NCache来实现对Zookeeper服务端事件的监听,不需要原生那样反复注册Watcher。
Cache分为节点监听NodeCache和子节点监听PathChildrenCache。
#NodeCache 监听节点内容和节点是否存在
public class NodeCache_Sample {
static String path = "/zk-book/nodecache";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "init".getBytes());
final NodeCache cache = new NodeCache(client,path,false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {//添加NodeCacheListener
@Override
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " +
new String(cache.getCurrentData().getData()));
}
});
client.setData().forPath( path, "u".getBytes() );
Thread.sleep( 1000 );
client.delete().deletingChildrenIfNeeded().forPath( path );
Thread.sleep( Integer.MAX_VALUE );
}
}
注:new NodeCache(client,path,false) 默认设置false,如果设置true,则NodeCache在第一次启动
时会立刻从Zookeeper上读取对应节点的数据,保存在NodeCache中。
NodeCache可以监听数据内容变化,也可以监听节点是否存在。如果节点不存在,创建节点之后会触发NodeCacheListener。但删除节点则无法触发NodeCacheListener
#PathChildrenCache 监听子节点新增、数据更新、删除
public class PathChildrenCache_Sample_ExecutorService {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.sessionTimeoutMs(5000)
.build();
static ExecutorService tp = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
client.start();
System.out.println( Thread.currentThread().getName() );
PathChildrenCache cache = new PathChildrenCache(client, path,true,false,tp);
cache.start(StartMode.NORMAL);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED," + event.getData().getPath());
System.out.println("CHILD_ADDED," + event.getData().getData());
System.out.println( "tname: " + Thread.currentThread().getName() );
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED," + event.getData().getPath());
System.out.println("CHILD_ADDED," + event.getData().getData());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED," + event.getData().getPath());
System.out.println("CHILD_ADDED," + event.getData().getData());
break;
default:
break;
}
}
});
Thread.sleep( 1000 );
client.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep( 1000 );
client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
Thread.sleep( 1000 );
client.delete().forPath(path+"/c1");
Thread.sleep( 1000 );
client.delete().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
注:
new PathChildrenCache(client, path,true,false,tp); 如果入参不传tp则使用默认的线程处理。

Cache调用start方法的模式
NORMAL: 初始时为空。
BUILD_INITIAL_CACHE: 在这个方法返回之前调用rebuild()。
POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
Cuartor无法对二级子节点进行事件的监听,如果对/zk/ci监听,那么当/zk/ci/c2节点创建或者删除时候是无法触发节点变化
7、master选举
原理:对接节点/master_select节点,多台机器同时在节点下创建子节点/master_select/lock,只有一个机器能创建成功,该机器也被选中为Master。一个机器成为Master之后,其他机器进入等待,直到该机器完成业务操作,释放Master权利。
public class Recipes_MasterSelect {
static String master_path = "/curator_recipes_master_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main( String[] args ) throws Exception {
client.start();
LeaderSelector selector = new LeaderSelector(client,
master_path,
new LeaderSelectorListenerAdapter() {
public void takeLeadership(CuratorFramework client) throws Exception {
//一个应用程序完成Master逻辑后,另一个程序的takeLeadership方法才被调用
System.out.println("成为Master角色");
Thread.sleep( 3000 );
System.out.println( "完成Master操作,释放Master权利" );
}
});
selector.autoRequeue();
selector.start();
Thread.sleep( Integer.MAX_VALUE );
}
}
原生API实现节点选举
public class TestMaster implements Watcher{
String serverId = Integer.toString(2);
private ZooKeeper zk;
private String hostPort;
private volatile boolean connected = false;
private volatile boolean expired = false;
boolean isLeader = false; // returns true if there is a master
void startZK() throws IOException {
zk = new ZooKeeper(hostPort, 15000, this);
}
void stopZK() throws InterruptedException, IOException {
zk.close();
}
boolean checkMaster() throws KeeperException, InterruptedException {
while (true) {
try {
Stat stat = new Stat();
byte data[] = zk.getData("/master", false, stat);
isLeader = new String(data).equals(serverId);//检查master节点是否由当前serverId创建
return true;
} catch (NoNodeException e) {
// no master, so try create again
return false;
} catch (ConnectionLossException e) {
}
}
}
void runForMaster() throws InterruptedException, KeeperException {
while (true) {
try {
zk.create("/master", serverId.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
isLeader = true;
takeLeadership();//获取Master节点,执行任务
break;
} catch (NodeExistsException e) {
isLeader = false;//master节点已经存在
break;
} catch (ConnectionLossException e) {
}
if (checkMaster()) break;
}
}
void takeLeadership() {
//getWorkers();
(new RecoveredAssignments(zk)).recover( new RecoveryCallback() {
public void recoveryComplete(int rc, List<String> tasks) {
if(rc == RecoveryCallback.FAILED) {
System.out.println("Recovery of assigned tasks failed.");
} else {
System.out.println( "Assigning recovered tasks" );
//getTasks();
}
}
});
}
@Override
public void process(WatchedEvent e) {
System.out.println("Processing event: " + e.toString());
if(e.getType() == Event.EventType.None){
switch (e.getState()) {
case SyncConnected:
connected = true;
break;
case Disconnected:
connected = false;
break;
case Expired:
expired = true;
connected = false;
System.out.println("Session expiration");
default:
break;
}
}
}
}
8、分布式锁
public class Recipes_Lock {
static String lock_path = "/curator_recipes_lock_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
final CountDownLatch down = new CountDownLatch(1);
for(int i = 0; i < 30; i++){
new Thread(new Runnable() {
public void run() {
try {
down.await();//等待线程都创建结束
lock.acquire();
} catch ( Exception e ) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println("生成的订单号是 : "+orderNo);
try {
lock.release();
} catch ( Exception e ) {
}
}
}).start();
}
down.countDown();
}
}
网友评论