python监控磁盘使用率

发布时间 2023-07-20 18:10:30作者: 地铁昌平线
#!/usr/bin/env python 
# -*- coding: utf-8 -*-
# @Time    : 2023/6/27 13:56
# @Author  : zk_linux
# @File    : Disk_monitoring.py.py
# @Software: PyCharm
from temp import ail_the_alarm
import psutil
import os

threshold_value = 75

os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


def get_disk_info():
    disk_usage = psutil.disk_usage('/').percent
    return disk_usage


def monitoring():
    res = get_disk_info()
    if res >= threshold_value:
     
       webhook = ail_the_alarm.DingTalkUrl('/server/scripts/config.ini').get_config()
       ding_msg = ail_the_alarm.Send_Dingding('HK-集群环境02', webhook['mobile_number'], web_url=webhook['prod_webhook_url'],secret=webhook['prod_secret'], msg="当前磁盘使用率:{} 已超出预值".format(res))
       ding_msg.send_dingnding()
 
if "__name__ == __main__":
    monitoring()

  

[root@acs-hk-ctos7-prod-02 scripts]# cat config.ini 
[MySQL]
master_host=172
master_port=3306
master_user=root
master_password=Z
slave_host=1


[DingTalk]
#生产
prod_webhook_url = https://oapi.dingtalk.com/robot/send?access_token=94790bbfa97c855d241b9eacf5b1d7ed868afd4d863e14f04b04cd8d06
prod_secret=SEC859f1cefff0cc3c202df524222d05c65ac8ffb4ec7da3ecfbe5257
#测试
dev_webhook_url= https://oapi.dingtalk.com/robot/send?access_token=39095663a52a5975a827a87f84af8d14321919536e71e43f90488
dev_secret= SEC359c7842f082d8b7b90ef46bd7d2e3063a1aad29b55b98e0e8519bd
mobile_number=13

[Redis]
redis_master_host= 172.
redis_master_port= 6379
redis_master_password = 
redis_slave_host= 17

[Code]
mobile_phone_number = 1569
signature_name = 智控网络
template_code = SMS_4606
master_monitor_address= http://172..7:6666/user/getNewPublicKey?loginType=1
slave_monitor_address= http://172.318:6666/user/getNewPublicKey?loginType=1

[sms]
phonenumbers = 15657
signname = 网络
templatecode = SMS_46

  

[root@acs-hk-ctos7-prod-02 scripts]# cat temp/ail_the_alarm.py 
#!/usr/bin/env python 
# -*- coding: utf-8 -*-
# @time    : 2023/7/7 10:48
# @File    : temp.py
# @Author  : zk_linux
# @Software: PyCharm
# @Description:
import datetime
import json
import socket
import os
import requests
import configparser
import base64
import hmac
import urllib
import hashlib
import time


class GetDigest:
    @staticmethod
    def get_digest(secret):
        # secret= "SEC359c7842f08286efb1fc5d8b7b90ef46bd7d2e3063a1aad29b55b98e0e8519bd"
        timestamp = str(round(time.time() * 1000))
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
        return f"&timestamp={timestamp}&sign={sign}"


class GetSysyemInfo:
    @staticmethod
    def get_system_info():
        system_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        host_name = socket.gethostname()
        system_load = os.popen("uptime|awk '{print $NF,$NF-1,$NF-2}'")
        load = system_load.read().strip()
        ip = requests.get('http://ifconfig.me').text.strip()
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        local_ip = s.getsockname()[0]
        s.close()

        return system_time, host_name, load, local_ip ,ip


class DingTalkUrl:
    def __init__(self, config_file):
        self.config_file = config_file

    def get_config(self):
        config = configparser.ConfigParser()
        config.read(self.config_file)
        return config['DingTalk']


class Send_Dingding(GetSysyemInfo):
    # system_info = GetSysyemInfo.get_system_info()

    def __init__(self, *args, **kwargs):
        '''

        :param title,mobile_number
        :param web_url,secret,msg
        '''
        self.args = args
        self.kwargs = kwargs



    def send_dingnding(self):
        '''
        :return: requests  web_url,secret data
        '''
        MSG = (
            "**告警主题**:" + "\n" + str(self.args[0]) + "\n\n"
            " >当前时间:" + "\n" + str(GetSysyemInfo.get_system_info()[0]) + "\n\n"
            " >当前主机名:"  + "\n" + str(GetSysyemInfo.get_system_info()[1]) + "\n\n" \
            " >当前系统load:" + "\n" + str(GetSysyemInfo.get_system_info()[2]) + "\n\n" \
            " >当前服务器IP:" + "\n" + str(GetSysyemInfo.get_system_info()[3]) + "\n" + str(GetSysyemInfo.get_system_info()[4]) +"\n\n" \
            " >详细信息:" + "\n" + self.kwargs['msg'] +  "\n")


        data={
          "msgtype": "markdown",
          "markdown": {
            "title": str(self.args[0]),
            "text": MSG,
          },
          "at": {
            "atMobiles": [
              self.args[1]
            ],
            "isAtAll": False
          }
        }

        headers = {
              'Content-Type': 'application/json'
        }
        req = requests.post(self.kwargs['web_url'] + GetDigest.get_digest(self.kwargs['secret']),json=data,headers=headers)