之前写的export由于要改进代码,稍作了修改,出现了几个小问题,记录一下。
之前的代码全写在collect中,因为是获取单个节点的信息,现在要获取多个节点的信息,所以考虑了多个封装函数:
class CustomCollector(object):
def collect(self):
# 执行scontrol,获取nodes信息
try:
output = subprocess.Popen("scontrol -o show node",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out_put = output.communicate()[0]
if out_put:
# 每行是一个node,获取单个node信息
for node_params in out_put.splitlines():
d = {}
for n in node_params.split():
node = n.split("=")
if node != ['']:
d[node[0]] = node[1]
self.get_node_params(d)
except Exception as e:
logging.error(e)
# 获取jobs状态
try:
output = subprocess.Popen("squeue --format '%A:%c:%t'",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out_put = output.communicate()[0]
if out_put:
job_run_count = re.findall(r'(\d+?):(\d+?):R', out_put)
job_run_sum = len(job_run_count)
yield GaugeMetricFamily('slurm_job_running', 'running_count', value=job_run_sum)
job_pd_count = re.findall(r'(\d+?):(\d+?):PD', out_put)
job_pd_sum = len(job_pd_count)
yield GaugeMetricFamily('slurm_job_pending', 'pending_count', value=job_pd_sum)
except Exception as e:
logging.error(e)
# 获取node参数
def get_node_params(self, node):
host_name = node['NodeHostName']
total_count = int(node['CPUTot'])
a = GaugeMetricFamily('slurm_cpu_total', 'total_count', labels=['host_name'])
a.add_metric([host_name], total_count)
yield a
alloc_count = int(node['CPUAlloc'])
b = GaugeMetricFamily('slurm_cpu_alloc', 'alloc_count', labels=['host_name'])
b.add_metric([host_name], alloc_count)
yield b
total_memory = int(node['RealMemory'])
c = GaugeMetricFamily('slurm_memory_total', 'total_memory', labels=['host_name'])
c.add_metric([host_name], total_memory)
yield c
alloc_memory = int(node['AllocMem'])
d = GaugeMetricFamily('slurm_memory_alloc', 'alloc_memory', labels=['host_name'])
d.add_metric([host_name], alloc_memory)
yield d
REGISTRY.register(CustomCollector())
当运行时会报错,显示core.py文件内,desc_func()不是一个iterable。这是啥意思呢,点进去看看:
def _get_names(self, collector):
'''Get names of timeseries the collector produces.'''
desc_func = None
# If there's a describe function, use it.
try:
desc_func = collector.describe
except AttributeError:
pass
# Otherwise, if auto describe is enabled use the collect function.
if not desc_func and self._auto_describe:
desc_func = collector.collect
if not desc_func:
return []
result = []
type_suffixes = {
'counter': ['_total', '_created'],
'summary': ['', '_sum', '_count', '_created'],
'histogram': ['_bucket', '_sum', '_count', '_created'],
'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
'info': ['_info'],
}
for metric in desc_func():
for suffix in type_suffixes.get(metric.type, ['']):
result.append(metric.name + suffix)
return result
出错出在最后一个for循环,desc_func()
是什么呢?往上找找,desc_func = collector.collect
,这个collector是什么呢,是传进来的参数,再往上找,发现是register调用的我们自己创建的CustomCollector()
类,所以这个desc_func = self.collect()
,就是我们自己写的collect方法,再看这个for循环,才发现我们的collect没有返回值,所以并不能通过调用get_node_params()
方法来实现返回,所以这里要做个处理,使collect是个iterable,并且在get_node_params()
方法把它本身所有的yield执行完之后,统一做出yield,修改如下:
def collect(self):
# 执行scontrol,获取nodes信息
try:
output = subprocess.Popen("scontrol -o show node",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out_put = output.communicate()[0]
if out_put:
# 每行是一个node,获取单个node信息
for node_params in out_put.splitlines():
d = {}
for n in node_params.split():
node = n.split("=")
if node != ['']:
d[node[0]] = node[1]
# collect方法必须返回一个可迭代对象,这里把get函数的所有yield加载完成,统一返回
for metric in self.get_node_params(d):
yield metric
except Exception as e:
logging.error(e)
在本地跑一点问题都没有,但是 部署在aws上之后,logging一直报错,指向d[node[0]] = node[1]
,out of range
,WTF? 只能使用打印大法,结果发现aws上不仅仅可以查到正在运行的节点,还有已死的节点,这个跟本地不一样,还有一点,我的out_put也存在一个问题,由于是利用空格切割,最后有一个参数 reason= Not Pending
, !!! 多了个空格!又得处理这个东西了,修改如下:
def collect(self):
# 执行scontrol,获取nodes信息
try:
output = subprocess.Popen("scontrol -o show node",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out_put = output.communicate()[0]
if out_put:
# 每行是一个node,获取单个node信息
for node_params in out_put.splitlines():
d = {}
last_key = None
for n in node_params.split():
node = n.split("=")
if len(node) < 2:
d[last_key] += ' '
d[last_key] += node[0]
else:
d[node[0]] = node[1]
last_key = node[0]
# collect方法必须返回一个可迭代对象,这里把get函数的所有yield加载完成,统一返回
for metric in self.get_node_params(d):
yield metric
except Exception as e:
logging.error(e)
给定一个last_key,存下最后一个正确的key,当node的长度<2时(也就是那个pending),在last_key的值后面把它填进去就行啦!
既然部署在aws的instance上,得给文件写个main函数
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, help='input a prot', dest='port', default=8010)
args = parser.parse_args()
coll = CustomCollector()
print(coll)
app = make_wsgi_app()
httpd = make_server('', args.port, app)
httpd.serve_forever()
if __name__ == '__main__':
main()
这里用了个库叫argparse
,我是用来在命令行执行时,如果没有写端口号就给个默认参数的,简单的解释下:
parser.add_argument('--port', type=int, help='input a prot', dest='port', default=8010)
是添加一个参数,--port
就是在命令行添加参数时给个命令,dest
是参数名底下调用要用到的,default
是默认的参数,启动时输入以下代码即可:
# 这就会默认以8010端口运行
python export.py
# 添加参数,以9999运行
python export.py --port 9999
以上!!!
网友评论