python3修改安全组

发布时间 2023-11-16 10:07:34作者: 地铁昌平线

场景:办公网络访问云资源,公司出口IP会变,试试更新到安全组

脚本如下:

#!/usr/bin/env python 
# -*- coding: utf-8 -*-
# @Time    : 2023/11/15 13:12
# @File    : security_group.py
# @Author  : zk_linux
# @Software: PyCharm
# @Description:

import json
import socket
from aliyunsdkecs.request.v20140526 import DescribeSecurityGroupAttributeRequest, RevokeSecurityGroupRequest, AuthorizeSecurityGroupRequest
from aliyunsdkcore.client import AcsClient
from aliyunsdkecs.request.v20140526 import RevokeSecurityGroupRequest
from aliyunsdkcore.acs_exception.exceptions import ServerException
import logging
import time

# 认证信息
access_key_id = ""
access_key_secret = ""
# 地域
region_id = "cn-hangzhou"
# 安全组 ID
security_group_id = "sg-"
port = "80/80"

# 公司出口域名
hostname = "www.abc.cn"
# hostname = "baidu.com"

# 初始化实例
client = AcsClient(access_key_id, access_key_secret, 'cn-hangzhou')

# 日志路径
file_path = '/tmp/ip_address.log'

logging.basicConfig(level=logging.INFO,
                    filename=file_path,
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
                    )


class Domain_name_resolution:

    def __init__(self, name):
        self.name = name

    def domain_name(self):
        '''
        解析域名--> IP
        :return: IP
        '''
        try:
            ip_address = socket.gethostbyname(self.name)
            return ip_address
        except socket.error as e:
            print(f"Error: {e}")
            return None


class Query_rule:

    def __init__(self, sg_id, client):
        self.sg_id = sg_id
        self.client = client

    def query_rule(self):
        '''
        查询安全组80规则
        :return: 0 规则不存在
        '''

        describe_request = DescribeSecurityGroupAttributeRequest.DescribeSecurityGroupAttributeRequest()
        describe_request.set_SecurityGroupId(self.sg_id)
        describe_response = self.client.do_action_with_exception(describe_request)
        current_ip_ranges = describe_response.decode('utf-8')

        data = json.loads(current_ip_ranges)
        source_cidr_ip = data.get('Permissions', {}).get('Permission', [{}])

        for i in source_cidr_ip:
            if i.get('PortRange') != "80/80":
                return 0
            else:
                return i.get('SourceCidrIp')


class Delete_old_rules:

    def __init__(self, sg_id, port, old_public_ip):
        self.security_group_id = sg_id
        self.port = port
        self.old_public_ip = old_public_ip

    def Delete_old_ip(self):
        '''
        删除规则
        :return:
        '''
        revoke_request = RevokeSecurityGroupRequest.RevokeSecurityGroupRequest()
        revoke_request.set_SecurityGroupId(self.security_group_id)
        revoke_request.set_IpProtocol("tcp")
        revoke_request.set_PortRange(self.port)
        revoke_request.set_SourceCidrIp(self.old_public_ip)
        revoke_response = client.do_action_with_exception(revoke_request)
        return revoke_response


class Add_an_access_rule:
    def __init__(self, sg_id, port, new_ip):
        self.sg_id = sg_id
        self.port = port
        self.new_ip = new_ip

    def Add_rule(self):
        '''
        出口IP变动更新到安全组
        :return:
        '''
        authorize_request = AuthorizeSecurityGroupRequest.AuthorizeSecurityGroupRequest()
        authorize_request.set_SecurityGroupId(self.sg_id)
        authorize_request.set_IpProtocol("tcp")
        authorize_request.set_PortRange(self.port)
        authorize_request.set_SourceCidrIp(self.new_ip)

        authorize_response = client.do_action_with_exception(authorize_request)
        return authorize_response


class Check_public_ip:
    @classmethod
    def Check_ip_is_updated(cls):
        # 获取当前出口新IP
        ojb1 = Domain_name_resolution(hostname)
        new_ip = ojb1.domain_name()
        # 获取安全组中失效IP
        obj = Query_rule(security_group_id, client)
        old_ip = obj.query_rule()
        # 入网80端口存在,先删除,在添加

        if old_ip != 0:
            logging.warning("规则存在")
            if old_ip != new_ip:
                logging.warning("删除规则")
                del_ip = Delete_old_rules(security_group_id, port, old_ip)
                del_response = del_ip.Delete_old_ip()
                logging.warning("删除响应:{}".format(del_response))
                Add = Add_an_access_rule(security_group_id, port, new_ip)
                add_response = Add.Add_rule()
                logging.info("80入网IP更新,更新IP:{}响应:{}".format(new_ip, add_response))
            else:
                logging.info("出口IP未更新.")

        else:
            logging.error("80入网规则不存在")


if __name__ == '__main__':
    while True:
        zk_net = Check_public_ip()
        zk_net.Check_ip_is_updated()
        time.sleep(3)