python 处理异步物化视图同时执行导致内存溢出问题

发布时间 2023-10-18 10:20:46作者: 锦大大的博客呀!
python 处理异步物化视图同时执行导致内存溢出问题
一、前提:因为物化视图过多,同时物化视图到时间同时爆发,导致CPU爆满,所以采用datax自带的调度服务来执行python命令
二、直接看代码:
import pymysql
import pymssql
import datetime
import time
class Materialized_plan:
    def connect_to_starrock(self):
        username = '用户名'
        password = '密码'
        myhost = '服务器'
        port = '9031'
        databasename='库名'
        conn = pymysql.connect(
            host=myhost,
            port=int(port),
            user=username,
            password=password,
            database=databasename
        )
        return conn
    def connect_to_sqlserver(self):
        username = '用户名'
        password = 'eIzo48Z8nh96VuaL0tt3EWDD7'
        myhost = '服务器'
        port = '1433'
        databasename='库名'
        conn = pymssql.connect(server=myhost, user=username, database=databasename,password=password, port=port, charset='utf8')
        return conn
    def GetplanData(self):
        sqlser_conn= self.connect_to_sqlserver()
        selectSql='select * from starrocks_materialized_plan where UpdateTime< CONVERT(date, GETDATE()) order by level'
        cursor = sqlser_conn.cursor()
        cursor.execute(selectSql,('成功'))
        datas = cursor.fetchall()
        print(datas)
        return datas
    def RunPlan(self):
        datas= self.GetplanData()
        for row in datas:
            nowdate=datetime.datetime.now()
            runsql='REFRESH MATERIALIZED VIEW '+row[1]+'.'+row[2]
            starrocks_conn= self.connect_to_starrock()
            cursor = starrocks_conn.cursor()
            try:
                cursor.execute(runsql)
                starrocks_conn.commit()
                print("刷新物化视图成功:"+ runsql)
                # 开始更新数据
                pdate_query = "UPDATE starrocks_materialized_plan SET SuccessOrNot = %s, UpdateTime = %s WHERE id = %s;INSERT INTO [dbo].[starrocks_materialized_plan_log]([PlanId], [ExecutionTime], [Message]) VALUES (%s, %s, %s);"
                sqlserver_conn = self.connect_to_sqlserver()
                sqlserver_cursor = sqlserver_conn.cursor()
                sqlserver_cursor.execute(pdate_query, ('成功', nowdate, row[0], row[0], nowdate, '成功'))
                sqlserver_conn.commit()
                # 完成以后等待指定秒钟
                print(str(row[11])+"秒钟以后继续执行")
                time.sleep(int(row[11]))
            except Exception as e:
                starrocks_conn.rollback()
                print("刷新物化视图失败:", runsql)
                print("错误信息:", e)
                #开始更新数据
                pdate_query = "UPDATE starrocks_materialized_plan SET SuccessOrNot = %s, UpdateTime = %s WHERE id = %s;INSERT INTO [dbo].[starrocks_materialized_plan_log]([PlanId], [ExecutionTime], [Message]) VALUES (%s, %s, %s);"
                sqlserver_conn = self.connect_to_sqlserver()
                sqlserver_cursor = sqlserver_conn.cursor()
                sqlserver_cursor.execute(pdate_query, ('失败', nowdate, row[0], row[0], nowdate, '失败'))
                sqlserver_conn.commit()
            # result= cursor.execute(runsql)
            # print(result.status)
            # print(result.data)
if __name__ == '__main__':
    print('开始执行物化视图循环')
    # time.sleep(10)
    m = Materialized_plan()  # 首先创建Product_Base的实例
    m.RunPlan() 
    print('执行物化视图结束')

相关表:

1. 主表

 2. 日志表:主要是用来记录每天的执行情况

 谢谢学习!!!