zabbix版本: 4.2.6
python3版本: 3.6.8
展示关于zabbix 的一些函数:
class zabbix_api_function():
def __init__(self, confPath):
self.param = param(confPath)
# log
print(os.path.join(BASE_DIR, 'logs', 'zabbix.log'))
self.logger = all_info.logger_init(self,os.path.join(BASE_DIR, 'logs', 'zabbix.log'))
# mongo client
self.logger.debug('init zabbix...')
self.zabbix_init(**self.param.zabbix)
def zabbix_init(self, zabbix_url, user, password):
try:
self.zapi = ZabbixAPI(server=zabbix_url, path="", log_level=0)
self.zapi.login(user, password)
except Exception as e:
self.logger.error("Failed to connect to zabbix!")
self.logger.error(e)
return False
self.logger.debug("connect to zabbix success!")
return True
def get_host_id(self, host_name):
try:
hostid = self.zapi.host.get({"filter": {"host": host_name}})[0]['hostid']
except Exception as e:
out = 'get '+ host_name + ' hostid ' + ' Fail!'
self.logger.error(out)
self.logger.error(e)
return False
out = 'get ' + host_name + ' hostid ' + ' Success!'
self.logger.debug(out)
return hostid
def get_group_id(self, group_name):
try:
groupid = self.zapi.hostgroup.get({"filter": {"name": group_name}})[0]['groupid']
except Exception as e:
out = 'get '+ group_name + ' groupid ' + ' Fail!'
self.logger.error(out)
self.logger.error(e)
return False
out = 'get ' + group_name + ' groupid ' + ' Success!'
self.logger.debug(out)
return groupid
def get_group_hosts_list(self, group_name):
group_id = self.get_group_id(group_name)
group_hosts_list = []
group_hosts_info = self.zapi.host.get({"selectGroups": "extend", "groupids": [group_id]})
for i in range(len(group_hosts_info)):
group_hosts_list.append(group_hosts_info[i]['host'])
return group_hosts_list
def get_group_inventory_list(self, group_name):
all = all_info(confPath=CONF_PATH)
all_inventory_info = all.all_info_get(group_name)
return all_inventory_info
def change_host_group(self, host_name, group_name):
host_id = self.get_host_id(host_name)
group_id = self.get_group_id(group_name)
try:
self.zapi.host.update({"hostid": host_id, "groups": [group_id]})
except Exception as e:
out = 'update '+ host_name + ' group 信息' + ' Fail!'
self.logger.error(out)
self.logger.error(e)
return False
out = 'update ' + host_name + ' group 信息' + ' Success!'
self.logger.debug(out)
return True
def create_group(self, group_name):
try:
self.zapi.hostgroup.create({"name": group_name})
except Exception as e:
out = 'create '+ group_name + ' Fail!'
self.logger.error(out)
self.logger.error(e)
return False
out = 'create ' + group_name + ' Success!'
self.logger.debug(out)
return True
def create_host(self, host_name, group_name):
group_id = self.get_group_id(group_name)
try:
EPIC_templateid = self.zapi.template.get({"filter": {"host": "Linux OS EPIC"}})[0]['templateid']
except Exception as e:
self.logger.error('get EPIC_templateid Fail!')
self.logger.error(e)
return False
self.logger.debug('get EPIC_templateid Success! ')
return True
try:
self.zapi.host.create({"host": host_name, "interfaces": [
{"type": 1, "main": 1, "useip": 1, "ip": host_name, "dns": "", "port": "10050"}],
"groups": [{"groupid": group_id}], "tags": [{"tag": "Host name", "value": host_name}],
"templates": [{"templateid": EPIC_templateid}]})
except Exception as e:
out = 'create ' + host_name + ' Fail!'
self.logger.error(out)
self.logger.error(e)
return False
out = 'create ' + host_name + ' Success!'
self.logger.debug(out)
return True
def get_all_zabbix_list(self):
all_zabbix_groups_list = []
try:
all_zabbix_group_info = self.zapi.hostgroup.get({"output": "extend"})
except Exception as e:
self.logger.error('get all_zabbix_groups_info Fail!')
self.logger.error(e)
return False
self.logger.debug('get all_zabbix_groups_info Success!')
for i in range(len(all_zabbix_group_info)):
all_zabbix_groups_list.append(all_zabbix_group_info[i]['name'])
return(all_zabbix_groups_list)
def get_zabbix_all_hosts(self):
all_zabbix_allgroup_hosts_list = []
try:
all_zabbix_allhosts_info = self.zapi.host.get({"output": "extend", })
except Exception as e:
self.logger.error('get all_zabbix_hosts_info Fail!')
self.logger.error(e)
return False
self.logger.debug('get all_zabbix_hosts_info Success!')
for i in range(len(all_zabbix_allhosts_info)):
all_zabbix_allgroup_hosts_list.append(all_zabbix_allhosts_info[i]['host'])
return(all_zabbix_allgroup_hosts_list)
def add_hong(self,host_name,hong_name,hong_value): #新增宏
hostid = self.zapi.host.get({"filter": {"host": host_name}})[0]['hostid']
self.zapi.usermacro.create({
"hostid": hostid,
"macro": hong_name,
"value": hong_value
})
def update_hong(self,host_name,hong_name,hong_value): #更新宏
hostmacroid = self.get_hostmacroid(host_name,hong_name)
self.zapi.usermacro.update({
"hostmacroid": hostmacroid,
"value": hong_value
})
def get_hostmacroid(self,host_name,hong_name): #获取单个宏id
hostid = self.zapi.host.get({"filter": {"host": host_name}})[0]['hostid']
host_macro_id_all = self.zapi.usermacro.get({ "hostids":hostid })
for host_macro_line in host_macro_id_all:
if host_macro_line['macro'] == hong_name:
host_macro_id = host_macro_line['hostmacroid']
return host_macro_id
def get_hostmacroid_value(self,host_name,hong_name): #获取单个宏 值
hostid = self.zapi.host.get({"filter": {"host": host_name}})[0]['hostid']
host_macro_id_all = self.zapi.usermacro.get({ "hostids":hostid })
for host_macro_line in host_macro_id_all:
if host_macro_line['macro'] == hong_name:
host_macro_id_value = host_macro_line['value']
return host_macro_id_value
def get_hostmacroid_list(self,host_name): #获取单台主机所有宏id
host_macros_list = []
hostid = self.zapi.host.get({"filter": {"host": host_name}})[0]['hostid']
host_macros = self.zapi.usermacro.get({"output":"extend", "hostids":hostid})
for i in range(len(host_macros)):
host_macros_list.append(host_macros[i]['hostmacroid'])
return host_macros_list
def get_hostmacroname_list(self,host_name): #获取单台主机所有宏id
host_macros_name_list = []
hostid = self.zapi.host.get({"filter": {"host": host_name}})[0]['hostid']
host_name_macros = self.zapi.usermacro.get({"output":"extend", "hostids":hostid})
for i in range(len(host_name_macros)):
host_macros_name_list.append(host_name_macros[i]['macro'])
return host_macros_name_list
def delete_hostmacroid(self,host_name):
host_macros_list = self.get_hostmacroid_list(host_name)
self.zapi.usermacro.delete(host_macros_list)
网友评论