kazoo是一个提供访问调用zookeeper方法的一个python包。kazoo的官方文档如下:
https://kazoo.readthedocs.io/en/latest/
from kazoo.client import KazooClient
import configparser
import logging
import socket
hostname = socket.gethostname()
logging.basicConfig(filename="test.log",format='%(asctime)s %(levelname)s \
%(message)s',level=logging.DEBUG)
#获取zookpeer的hosts配置
cf = configparser.ConfigParser()
cf.read("test.conf")
zkhosts = cf.get("test-zk","zk_hosts")
#启动zk链接
zk = KazooClient(hosts=zkhosts)
zk.start()
#创建znode,如果节点不存在,则创建
if not zk.exists("/zk_ha"):
zk.create("/zk_ha")
if not zk.exists("/zk_ha/conf"):
zk.create("/zk_ha/conf")
if not zk.exists("/zk_ha/nodes"):
zk.create("/zk_ha/nodes")
#创建临时有序文件并写入数据hostname
zk.create("/zk_ha/nodes/node",bytes(hostname,\
encoding="utf-8"),ephemeral=True,sequence=True)
while True:
#获取children节点中最小编号的节点
child = zk.get_children("/zk_ha/nodes")
child_list = sorted(child)
min_child = child_list[0]
min_child_path = "/zk_ha/nodes/" + min_child
#读取最小children节点的数据
if zk.exists(min_child_path):
try:
data,stat = zk.get(min_child_path)
x = data.decode("utf-8")
except Exception as emsg:
break
#获取状态信息
datac,statc = zk.get("/zk_ha/conf")
i = datac.decode("utf-8")
#如果若果最小children节点为本节点所创建的,
#则使用conf中的数据开始处理任务
if x == hostname:
print("test ok %s"%i)
if i:
i = int(i) + 1
else:
i =1
zk.set("/zk_ha/conf",bytes(str(i),encoding="utf-8"))
else:
continue
zk.stop()
网友评论