package zk.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class TestCurator {
public static final Logger logger = LoggerFactory.getLogger(TestCurator.class);
public CuratorFramework client = null;
@Before
public void start() {
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
client = CuratorFrameworkFactory
.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("test")
.build();
client.start();
}
/**
* zookeeper恢复数据:从事物日志中进行恢复数据(事物日志类似于mysql的bin log)
* 每次操作都会有一个事物id,连接 关闭 增删改 包括创建失败删除失败(对于查询和添加watch不增加事物id)
*/
/**
cZxid = 0x3 创建节点的事物id
ctime = Fri Oct 2 创建节点的时间
mZxid = 0x34 节点数据修改的事物id
mtime = Fri Oct 2 节点数据修改的时间
pZxid = 0x35 子节点删除或者增加的事物id(不包含子节点的数据修改)
cversion = 8 子节点删除或者增加的版本 (不包含子节点的数据修改)
dataVersion = 1 当前节点的数据修改的版本
aclVersion = 0 权限控制
ephemeralOwner = 临时节点的会话id
dataLength = 3 当前节点数据的长度
numChildren = 4 当前节点的子节点数
*/
/**
子节点删除增加:影响父节点pZxid cversion numChildren
当前节点数据修改:影响当前节点 data mZxid mtime dataVersion dataLength
节点创建:ctime cZxid
权限:aclVersion
临时节点:ephemeralOwner
子节点的数据修改不影响父节点
*/
/**
* 只监控直属子节点的增删,直属子节点数据修改 (当删除该节点,在创建时无法继续监听)
* /test/province不存在会自动创建
* 启动时 /test/province子节点存在会触发
*
* @throws Exception
*/
@Test
public void childChange() throws Exception {
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/province", true);
ListenerContainer<PathChildrenCacheListener> listenable = pathChildrenCache.getListenable();
listenable.addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
logger.info("==============================================");
logger.info("curatorFramework:" + curatorFramework);
logger.info("pathChildrenCacheEvent:" + pathChildrenCacheEvent);
List<ChildData> currentData = pathChildrenCache.getCurrentData();
logger.info("currentData:" + currentData);
}
});
pathChildrenCache.start();
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 监听当前节点 删除(currentData == null) 修改数据 创建该节点(即使从父类开始新建) 启动存在会触发
*
* @throws Exception
*/
@Test
public void nodeChange() throws Exception {
final NodeCache nodeCache = new NodeCache(client, "/province/shanxi");
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
logger.info("==============================================");
logger.info("nodeChanged:" + currentData);
}
});
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void treeChange() throws Exception {
TreeCache treeCache = new TreeCache(client, "/");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
logger.info("==============================================");
logger.info("treeCacheEvent:" + treeCacheEvent);
}
});
treeCache.start();
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void createContainers() throws Exception {
client.createContainers("/container");
}
@Test
public void createNode() throws Exception {
client.create().forPath("/province");
}
@Test
public void createPersistentNode() throws Exception {
client.create().withMode(CreateMode.PERSISTENT).forPath("/shanxi");
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/hebei");
}
@Test
public void createEphemeralNode() throws Exception {
client.create().withMode(CreateMode.EPHEMERAL).forPath("/shanxi1");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/hebei1");
Thread.sleep(Integer.MAX_VALUE);
}
@Test
public void createChildNode() throws Exception {
client.create().forPath("/province/shanxi");
}
@Test
public void deleteNode() throws Exception {
client.delete().deletingChildrenIfNeeded().forPath("/province");
}
@Test
public void setData() throws Exception {
client.setData().forPath("/province", "data".getBytes());
}
@Test
public void createNodes() throws Exception {
client.create().creatingParentsIfNeeded().forPath("/province/shanxi");
client.create().creatingParentContainersIfNeeded().forPath("/province1/shanxi1");
}
@Test
public void deleteParentNode() throws Exception {
client.delete().deletingChildrenIfNeeded().forPath("/province");
}
@Test
public void getDataStat() throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/");
logger.info(stat.toString());
}
@Test
public void getData() throws Exception {
byte[] bytes = client.getData().forPath("/");
logger.info(new String(bytes));
}
@Test
public void setVersionData() throws Exception {
Stat stat = client.setData().withVersion(3).forPath("/province", "data".getBytes());
logger.info(stat.toString());
}
@After
public void stop() {
if (client != null) {
client.close();
}
}
}
package zk.curator;
/**
* Created by paul on 2018/10/27.
*/
import org.apache.zookeeper.client.FourLetterWordMain;
import org.apache.zookeeper.server.LogFormatter;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class TestZookeeperTools {
Logger logger = LoggerFactory.getLogger(TestZookeeperTools.class);
/**
* 对zookeeper的事物日志进行查看
* 18-10-26 下午06时27分03秒 session 0x10000182f6a0003 cxid 0x3 zxid 0xb setData '/test,#6e6968616f,1
* session 会话id
* cxid 客户端操作id
* zxid 服务端全局事物id
*
* @throws Exception
*/
@Test
public void logExplain() throws Exception {
LogFormatter.main(new String[]{"C:\\pengrun\\work\\software\\zookeeper-3.4.12\\bin\\data\\version-2\\log.1"});
}
/**
* 对zookeeper的运行状态进行socket命令行查看
*
* @throws IOException
*/
/*
static {
cmd2String.put(confCmd, "conf");打印出server的配置信息
cmd2String.put(consCmd, "cons");打印出server上的所有连接/会话 ***************
cmd2String.put(crstCmd, "crst");重置有关连接/会话的统计数据
cmd2String.put(dumpCmd, "dump");这个命令只有发给ensemble中的爹有才效. 列出所有重要的会话和生命周期随会话的znode
cmd2String.put(enviCmd, "envi");打印出server运行环境
cmd2String.put(getTraceMaskCmd, "gtmk");获取当前的trace mask值, 以10进制64位有符号数值形式返回, 具体trace mask的含义下面会讲
cmd2String.put(ruokCmd, "ruok"); are you ok, 如果回复 imok, 则说明这个server很健康. 如果server有问题, 则不会收到回复. 需要注意的是, 一个server回复了ruok不代表这个server在ensemble中的状态是正常的, 这仅代表server进程正常启动了. 要查看ensemble的概况需要用stat命令
cmd2String.put(setTraceMaskCmd, "stmk");设置trace mask
cmd2String.put(srstCmd, "srst");重置server上的所有统计数据
cmd2String.put(srvrCmd, "srvr");列出server的全部信息
cmd2String.put(statCmd, "stat");列出server的细节信息和与之相连的clients
cmd2String.put(wchcCmd, "wchc");列出监控这个server的所有会话, 并列出每个会话监控的名称空间路径. 注意, 在会话较多的server上, 这个命令可能会相当耗时
cmd2String.put(wchpCmd, "wchp");列出被监控的所有层级名称空间路径, 以及相关的会话. 注意同上, 这个命令也可能会相当耗时
cmd2String.put(wchsCmd, "wchs");列出对该server的所有监控(watch)
cmd2String.put(mntrCmd, "mntr");列出有关ensemble的一系列状态值. 通过这些状态值可以查看整个ensemble是不是正常
cmd2String.put(isroCmd, "isro");检查server是否运行在只读状态, 回复ro代表server在只读状态, 回复rw代表server在可读可写状态
}
*/
@Test
public void sendCmd() throws IOException {
String commands = System.getProperty("zookeeper.4lw.commands.whitelist");
//ServerCnxn.isEnabled(cmd)
//System.getProperty(ZOOKEEPER_4LW_COMMANDS_WHITELIST);
//private static final String ZOOKEEPER_4LW_COMMANDS_WHITELIST = "zookeeper.4lw.commands.whitelist";
//String[] list = commands.split(",");
//java启动设置 -Dzookeeper.4lw.commands.whitelist=*
String results0 = FourLetterWordMain.send4LetterWord("127.0.0.1", 2181, "cons");//当前session链接详细信息
String results1 = FourLetterWordMain.send4LetterWord("127.0.0.1", 2181, "dump");//session和临时节点
logger.info("\n" + results0);
System.out.println("=======================================");
logger.info("\n" + results1);
String results2 = FourLetterWordMain.send4LetterWord("127.0.0.1", 2181, "wchc");//session和它watch的路径
System.out.println("=======================================");
logger.info("\n" + results2);
}
}
网友评论