Python获取prometheus监控和告警数据

发布时间 2023-06-13 16:23:02作者: 创客未来

请参考 https://www.cuiliangblog.cn/detail/article/17

python使用requests模块获取API信息
prometheus 的 API接口

/api/v1/targets
/api/v1/query?query=<expr>
/api/v1/query_range?query=<expr>&start=<startstamp>&end=<endstamp>&step=<step>

范例01

class Monitor:
    """
    获取prometheus监控数据
    """

    def __init__(self):
        # prometheus地址
        self.usr = PROMETHEUS_URL
        # up状态节点列表
        self.up_list = []
        # down状态节点列表
        self.down_list = []
    def target(self):
        """
        获取监控节点
        :return:
        """
        url = self.usr + '/api/v1/targets'
        response = requests.request('GET', url)
        if response.status_code == 200:
            targets = response.json()['data']['activeTargets']
            for target in targets:
                if target['health'] == 'up':
                    self.up_list.append(target['discoveredLabels']['__address__'])
                else:
                    self.down_list.append(target['discoveredLabels']['__address__'])
            return self.up_list
        else:
            print('Get targets status failed!')
            return None

范例02

import requests
from ops_py.settings import PROMETHEUS_URL

class Monitor:
    """
    获取prometheus监控数据
    """

    def __init__(self):
        # prometheus地址
        self.usr = PROMETHEUS_URL
        # up节点列表
        self.up_list = []
        # down节点列表
        self.down_list = []

    def getQueryValue(self, query):
        """
        执行查询语句(查询单个值)
        :param query: 查询的语句
        :return: 查询到的值
        """
        base_url = self.usr + 'api/v1/query?query='
        inquire = base_url + query
        print(inquire)
        response = requests.request('GET', inquire)
        if response.status_code == 200:
            result = response.json()['data']['result'][0]
            # print("原始查询结果:", result)
            return result
        else:
            return None


    def get_os_release(self, address):
        """
        获取系统内核版本
        :param address:
        :return:
        """
        query = 'node_uname_info{job="linux",instance="' + address + '"}'
        result = self.getQueryValue(query)
        value = result['metric']['release']
        return value

    def get_up_time(self, address):
        """
        获取系统启动时长
        :return:
        """
        query = 'sum(time()-node_boot_time_seconds{job="linux",instance="' + address + '"}) by (instance)'
        result = self.getQueryValue(query)
        value = int(float(result['value'][1]))
        time_hour, time_sec = divmod(value, 3600)
        time_day, time_hour = divmod(time_hour, 24)
        time_mon, time_day = divmod(time_day, 30)
        return str(time_mon) + '' + str(time_day) + '' + str(time_hour) + '小时'

    def get_cpu_cores(self, address):
        """
        获取CPU核心数
        :param address:
        :return:
        """
        query = 'count(node_cpu_seconds_total{job="linux",mode="system",instance="' + address + '"}) by (instance)'
        result = self.getQueryValue(query)
        value = result['value'][1]
        return value

范例03

import time
import requests
from ops_py.settings import PROMETHEUS_URL


class Monitor:
    """
    获取服务器监控数据
    """

    def __init__(self):
        # prometheus地址
        self.usr = PROMETHEUS_URL
        # up节点列表
        self.up_list = []
        # down节点列表
        self.down_list = []

    def timeQuery(self, start_time, end_time):
        """
        范围查询语句构造时间查询格式
        :param start_time:
        :param end_time:
        :return:
        """
        start = int(time.mktime(time.strptime(start_time, "%Y-%m-%d %H:%M:%S")))
        end = int(time.mktime(time.strptime(end_time, "%Y-%m-%d %H:%M:%S")))
        step = int((end - start) / 9)
        return '&start=' + str(start) + '&end=' + str(end) + '&step=' + str(step)

    def target(self):
        """
        获取监控节点
        :return:
        """
        url = self.usr + '/api/v1/targets'
        response = requests.request('GET', url)
        if response.status_code == 200:
            targets = response.json()['data']['activeTargets']
            for target in targets:
                if target['health'] == 'up':
                    self.up_list.append(target['discoveredLabels']['__address__'])
                else:
                    self.down_list.append(target['discoveredLabels']['__address__'])
            return self.up_list
        else:
            print('Get targets status failed!')
            return None

    def getQueryRange(self, query, time_range):
        """
        执行查询语句(查询时间范围)
        :param time_range: 查询时间范围
        :param query: 查询的语句
        :return: 查询到的值
        """
        base_url = self.usr + 'api/v1/query_range?query='
        inquire = base_url + query + time_range
        print(inquire)
        response = requests.request('GET', inquire)
        if response.status_code == 200:
            result = response.json()['data']['result']
            # print("原始查询结果:", result)
            return result
        else:
            return None

    def get_cpu_use_rate(self, *params):
        """
        获取CPU使用率
        查单个值——(address)
        查所有资源图表值——(start_time,end_time)
        :return:
        """
        if len(params) == 1:
            address = params[0]
            query = 'avg(rate(node_cpu_seconds_total{job="linux",instance="'+address+'",mode="user"}[2m])) by (instance) *100'
            result = self.getQueryValue(query)
            value = round(float(result['value'][1]), 2)
            return str(value) + '%'
        elif len(params) == 2:
            query = 'avg(rate(node_cpu_seconds_total{job="linux",mode="user"}[2m])) by (instance) *100'
            time_range = self.timeQuery(params[0], params[1])
            result = self.getQueryRange(query, time_range)
            return result
        else:
            print('异常参数')


参考地址:https://www.cnblogs.com/faberbeta/p/16932674.html