美文网首页
prometheus的collect的使用

prometheus的collect的使用

作者: Elvis_zhou | 来源:发表于2018-12-07 11:24 被阅读0次

    之前写的export由于要改进代码,稍作了修改,出现了几个小问题,记录一下。

    之前的代码全写在collect中,因为是获取单个节点的信息,现在要获取多个节点的信息,所以考虑了多个封装函数:

    class CustomCollector(object):
    
        def collect(self):
            # 执行scontrol,获取nodes信息
            try:
                output = subprocess.Popen("scontrol -o show node",
                                          stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
                out_put = output.communicate()[0]
                if out_put:
                    # 每行是一个node,获取单个node信息
                    for node_params in out_put.splitlines():
                        d = {}
                        for n in node_params.split():
                            node = n.split("=")
                            if node != ['']:
                                d[node[0]] = node[1]
                        self.get_node_params(d)
            except Exception as e:
                logging.error(e)
    
            # 获取jobs状态
            try:
                output = subprocess.Popen("squeue --format '%A:%c:%t'",
                                          stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
                out_put = output.communicate()[0]
                if out_put:
                    job_run_count = re.findall(r'(\d+?):(\d+?):R', out_put)
                    job_run_sum = len(job_run_count)
                    yield GaugeMetricFamily('slurm_job_running', 'running_count', value=job_run_sum)
    
                    job_pd_count = re.findall(r'(\d+?):(\d+?):PD', out_put)
                    job_pd_sum = len(job_pd_count)
                    yield GaugeMetricFamily('slurm_job_pending', 'pending_count', value=job_pd_sum)
            except Exception as e:
                logging.error(e)
    
        # 获取node参数
        def get_node_params(self, node):
            host_name = node['NodeHostName']
            total_count = int(node['CPUTot'])
            a = GaugeMetricFamily('slurm_cpu_total', 'total_count', labels=['host_name'])
            a.add_metric([host_name], total_count)
            yield a
    
            alloc_count = int(node['CPUAlloc'])
            b = GaugeMetricFamily('slurm_cpu_alloc', 'alloc_count', labels=['host_name'])
            b.add_metric([host_name], alloc_count)
            yield b
    
            total_memory = int(node['RealMemory'])
            c = GaugeMetricFamily('slurm_memory_total', 'total_memory', labels=['host_name'])
            c.add_metric([host_name], total_memory)
            yield c
    
            alloc_memory = int(node['AllocMem'])
            d = GaugeMetricFamily('slurm_memory_alloc', 'alloc_memory', labels=['host_name'])
            d.add_metric([host_name], alloc_memory)
            yield d
    
    
    REGISTRY.register(CustomCollector())
    

    当运行时会报错,显示core.py文件内,desc_func()不是一个iterable。这是啥意思呢,点进去看看:

    def _get_names(self, collector):
            '''Get names of timeseries the collector produces.'''
            desc_func = None
            # If there's a describe function, use it.
            try:
                desc_func = collector.describe
            except AttributeError:
                pass
            # Otherwise, if auto describe is enabled use the collect function.
            if not desc_func and self._auto_describe:
                desc_func = collector.collect
    
            if not desc_func:
                return []
    
            result = []
            type_suffixes = {
                'counter': ['_total', '_created'],
                'summary': ['', '_sum', '_count', '_created'],
                'histogram': ['_bucket', '_sum', '_count', '_created'],
                'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
                'info': ['_info'],
            }
            for metric in desc_func():
                for suffix in type_suffixes.get(metric.type, ['']):
                    result.append(metric.name + suffix)
            return result
    

    出错出在最后一个for循环,desc_func()是什么呢?往上找找,desc_func = collector.collect,这个collector是什么呢,是传进来的参数,再往上找,发现是register调用的我们自己创建的CustomCollector()类,所以这个desc_func = self.collect(),就是我们自己写的collect方法,再看这个for循环,才发现我们的collect没有返回值,所以并不能通过调用get_node_params()方法来实现返回,所以这里要做个处理,使collect是个iterable,并且在get_node_params()方法把它本身所有的yield执行完之后,统一做出yield,修改如下:

    def collect(self):
            # 执行scontrol,获取nodes信息
            try:
                output = subprocess.Popen("scontrol -o show node",
                                          stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
                out_put = output.communicate()[0]
                if out_put:
                    # 每行是一个node,获取单个node信息
                    for node_params in out_put.splitlines():
                        d = {}
                        for n in node_params.split():
                            node = n.split("=")
                            if node != ['']:
                                d[node[0]] = node[1]
                        # collect方法必须返回一个可迭代对象,这里把get函数的所有yield加载完成,统一返回
                        for metric in self.get_node_params(d):
                            yield metric
            except Exception as e:
                logging.error(e)
    

    在本地跑一点问题都没有,但是 部署在aws上之后,logging一直报错,指向d[node[0]] = node[1]out of range,WTF? 只能使用打印大法,结果发现aws上不仅仅可以查到正在运行的节点,还有已死的节点,这个跟本地不一样,还有一点,我的out_put也存在一个问题,由于是利用空格切割,最后有一个参数 reason= Not Pending, !!! 多了个空格!又得处理这个东西了,修改如下:

    def collect(self):
            # 执行scontrol,获取nodes信息
            try:
                output = subprocess.Popen("scontrol -o show node",
                                          stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
                out_put = output.communicate()[0]
                if out_put:
                    # 每行是一个node,获取单个node信息
                    for node_params in out_put.splitlines():
                        d = {}
                        last_key = None
                        for n in node_params.split():
                            node = n.split("=")
                            if len(node) < 2:
                                d[last_key] += ' '
                                d[last_key] += node[0]
                            else:
                                d[node[0]] = node[1]
                                last_key = node[0]
                        # collect方法必须返回一个可迭代对象,这里把get函数的所有yield加载完成,统一返回
                        for metric in self.get_node_params(d):
                            yield metric
            except Exception as e:
                logging.error(e)
    

    给定一个last_key,存下最后一个正确的key,当node的长度<2时(也就是那个pending),在last_key的值后面把它填进去就行啦!

    既然部署在aws的instance上,得给文件写个main函数

    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument('--port', type=int, help='input a prot', dest='port', default=8010)
        args = parser.parse_args()
    
        coll = CustomCollector()
        print(coll)
    
        app = make_wsgi_app()
        httpd = make_server('', args.port, app)
        httpd.serve_forever()
    
    
    if __name__ == '__main__':
    
        main()
    

    这里用了个库叫argparse,我是用来在命令行执行时,如果没有写端口号就给个默认参数的,简单的解释下:
    parser.add_argument('--port', type=int, help='input a prot', dest='port', default=8010)
    是添加一个参数,--port就是在命令行添加参数时给个命令,dest是参数名底下调用要用到的,default是默认的参数,启动时输入以下代码即可:

    # 这就会默认以8010端口运行
    python export.py
    # 添加参数,以9999运行
    python export.py --port 9999
    

    以上!!!

    相关文章

      网友评论

          本文标题:prometheus的collect的使用

          本文链接:https://www.haomeiwen.com/subject/wynzcqtx.html