以下代码的python版本是2.7.15
工作中需要对平台指标的数据进行二次汇聚,故需要写部分python脚本,故在此记录一下。
先看一下Base类。
由于URL涉及公司业务,所以暂时删除掉了;该类主要封装了formatURL主要是对url进行一个format,request就是一个http请求工具,getMetricData是对指标数据进行汇聚,dictTransformToList是将dict转化为列表
import requests
import re
import json
class Base(object):
def __init__(self):
self.name = 'Base'
self.URL = ''
def formatUrl(self, url, _dict):
keys = re.findall(r'{\w+}', url)
for key in keys:
if _dict.has_key(key[1:-1]):
url = url.replace('$' + key, _dict[key[1:-1]])
return url
def request(self, *args):
url = self.formatUrl(self.URL, {'metricName': args[0] if len(args) > 0 else self.metricName, 'appID': self.appID})
response = None
try:
response = {"data": requests.get(url, timeout=5000), "errorCode": 0}
except requests.exceptions.RequestException:
response = {"data": {}, "errorCode": -1, "errorMsg": "Request exception"}
except requests.exceptions.HTTPError:
response = {"data": {}, "errorCode": -2, "errorMsg": "HHTP exception"}
except requests.exceptions.ConnectionError:
response = {"data": {}, "errorCode": -3, "errorMsg": "Connection exception"}
except requests.exceptions.Timeout:
response = {"data": {}, "errorCode": -4, "errorMsg": "Timeout exception"}
except requests.exceptions.Exception:
response = {"data": {}, "errorCode": -5, "errorMsg": "Unknown exception"}
finally:
print type(response['data'])
if response["errorCode"] == 0:
return response['data']['text']
else:
return {'metrics': []}
def getMetricsData(self, metricList):
metricData = {}
for item in metricList:
metricNameslist = item['metricname'].split('#')
parentKeyName = metricNameslist[self.metricKeysIndics[0]]
childKeyName = metricNameslist[self.metricKeysIndics[1]] if len(self.metricKeysIndics) == 2 else ''
metricData[parentKeyName] = metricData[parentKeyName] if metricData.has_key(parentKeyName) else {}
for metricNameIndex in self.metricNameIndics:
metricName = metricNameslist[metricNameIndex]
metricData[parentKeyName][metricName] = item['metrics'].values().pop()
return metricData
def dictTransformToList(self, _dict):
return map(lambda key:dict({'_key': key}, **_dict[key]), _dict.keys())
抽出来一个指标服务进行举例说明:
从ambari中获取指标数据,然后进行整理,比如计算集群的某个值,该集群有20个节点,那么将20个节点的数据进行统计,进行汇总出来集群的数据,如下所示:
from base import Base
class Zookeeper(Base):
def __init__(self):
super(Zookeeper, self).__init__()
self.appID = 'zookeeper'
self.metricName = 'zkInfo%23%25'
self.metricPrefix = '_zkInfo#'
self.metricKeysIndics = [2]
self.metricNameIndics = [1]
self.ret = {'errorcode' : 0, 'errormsg' : '' }
def run(self):
data = self.request()
if len(data['metrics']) == 0:
return dict(self.ret, **{
'errorcode': -1
})
response = self.getMetricsData(data['metrics'])
response = self.dictTransformToList(response)
response = self.metricReduce(response)
res = {}
for key in response.keys():
res[self.metricPrefix + '_' + key] = response[key]
print dict(self.ret, ** {
'data': res
})
def metricReduce(self, metricDataList):
def reduceFn(prev, now):
return {
此处指标名称涉及公司业务,故删除,主要是进行reduce计算
}
return reduce(reduceFn, metricDataList, {})
完。
网友评论