短信接口请自行实现
此处主要参考:https://github.com/apache/ambari/blob/trunk/ambari-server/docs/api/v1/alerts.md
# -*- coding:utf-8 -*-
import datetime
import sys
import requests
'''修改编码字符集为utf-8'''
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
reload(sys)
sys.setdefaultencoding(default_encoding)
intervals = (
('周', 604800), # 60 * 60 * 24 * 7
('天', 86400), # 60 * 60 * 24
('小时', 3600), # 60 * 60
('分钟', 60),
('秒', 1),
)
def display_time(seconds, granularity=2):
result = []
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('s')
result.append("{}{}".format(value, name))
else:
# Add a blank if we're in the middle of other values
if len(result) > 0:
result.append(None)
return ''.join([x for x in result[:granularity] if x is not None])
def time_format(timestamp):
dateArray = datetime.datetime.utcfromtimestamp(timestamp / 1000)
return dateArray.strftime("%Y-%m-%d %H:%M:%S")
def get_alert_items():
global items_
auth = ('admin', 'admin')
##此处ip地址为ambari-server节点的ip或hostname
url = 'http://192.168.0.101:8080/api/v1/clusters/xcarbigdata/alerts'
params = {
'fields': '*',
'Alert/state': 'CRITICAL',
'sortBy': 'Alert/original_timestamp'
}
response = requests.get(url, params=params, auth=auth)
response_json = response.json()
return response_json['items']
items_ = get_alert_items()
alert_messages=[]
for item in items_:
alert_ = item['Alert']
original_timestamp_ = alert_['original_timestamp']
latest_timestamp_ = alert_['latest_timestamp']
timestamp_ = (latest_timestamp_ - original_timestamp_) / 1000
alert_['duration'] = display_time(timestamp_)
alert_['original_timestamp'] = time_format(original_timestamp_)
alert_messages.append('组件:%(service_name)s/%(host_name)s \r\n'
'告警内容:%(label)s:%(text)s\r\n'
'开始时间:%(original_timestamp)s\r\n'
'持续时间:%(duration)s\r\n' % alert_)
if alert_messages:
send_sms('\r\n'.join(alert_messages))#send_sms(message)方法请自行实现
网友评论