dolphinscheduler工作流部署

发布时间 2023-04-04 14:00:48作者: 东哥加油!!!

python代码参考如下:

import pymysql

# 连接源数据库
source_conn = pymysql.connect(
    host='x.x.x.x',
    port=3306,
    user='root',
    password='pppp',
    database='dolphinscheduler'
)

# 连接目标数据库
target_conn = pymysql.connect(
    host='x.x.x.x',
    port=3308,
    user='root',
    password='xxxx',
    database='dolphinscheduler'
)






# 导出测试环境的任务
def migrate_t_ds_task_definition(project_code, workflow_code = None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str='code,name,version,description,project_code,user_id,task_type,task_execute_type,task_params,flag,task_priority,worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,task_group_id,task_group_priority,cpu_quota,memory_max,create_time,update_time'
    table_name = 't_ds_task_definition'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]
    if workflow_code is None:
        sql_1 = "select %s from %s" \
                " where project_code=%d " % (column_str,table_name,project_code)
        sql_delete = "delete from %s where project_code=%d" % (table_name,project_code)
        t_cursor.execute(sql_delete);
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据'%(table_name))
        else:
            # 插入目标库
            sql_3 = "insert into %s (%s)values (%s)" %(table_name,column_str,val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    else:
        sql_1 = "select %s from %s where code in (select post_task_code from t_ds_process_task_relation where process_definition_code='%s');"\
                % (column_str,table_name,workflow_code)
        # 查询源库

        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据'%(table_name))
        else:
            # 循环遍历获取code列的所有值
            for row in result_set:
                # 删除目标库数据
                sql_2 = "delete from %s where code=%s" % (table_name,row[0])
                t_cursor.execute(sql_2);
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)"%(table_name,column_str,val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()
    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()



def migrate_t_ds_task_definition_log(project_code, workflow_code = None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str = 'code,name,version,description,project_code,user_id,task_type,task_execute_type,task_params,flag,task_priority,worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator,task_group_id,task_group_priority,operate_time,cpu_quota,memory_max,create_time,update_time'
    table_name = 't_ds_task_definition_log'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]
    if workflow_code is None:
        sql_1 = "select %s from %s" \
                " where project_code=%d " % (column_str,table_name,project_code)
        sql_delete = "delete from %s where project_code=%d" % (table_name,project_code)
        t_cursor.execute(sql_delete);
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据'%(table_name))
        else:
            # 插入目标库
            sql_3 = "insert into %s (%s)values (%s)" %(table_name,column_str,val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    else:
        sql_1 = "select %s from %s where code in (select post_task_code from t_ds_process_task_relation where process_definition_code='%s');"\
                % (column_str,table_name,workflow_code)
        # 查询源库

        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据'%(table_name))
        else:
            # 循环遍历获取code列的所有值
            for row in result_set:
                # 删除目标库数据
                sql_2 = "delete from %s where code=%s" % (table_name,row[0])
                t_cursor.execute(sql_2);
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)"%(table_name,column_str,val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()





def migrate_t_ds_process_definition(project_code, workflow_code = None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str = 'code,name,version,description,project_code,release_state,user_id,global_params,flag,locations,warning_group_id,timeout,tenant_id,execution_type,create_time,update_time'
    table_name = 't_ds_process_definition'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]

    if workflow_code is None:
        sql_1 = "select %s from %s where project_code=%d" % (column_str,table_name,project_code)
        sql_delete = "delete from %s where project_code=%d" % (table_name, project_code)
        t_cursor.execute(sql_delete);
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据'%(table_name))
        else:
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)"%(table_name,column_str,val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()


    else:
        sql_1 = "select %s from %s where project_code=%d and code=%d" % (column_str,table_name,project_code, workflow_code)
        # 查询源库
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据'%(table_name))
        else:
            # 循环遍历获取code列的所有值
            for row in result_set:
                # 删除目标库数据
                sql_2 = "delete from %s where code in (%s)" % (table_name,row[0])
                t_cursor.execute(sql_2);
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()


    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()

def migrate_t_ds_process_definition_log(project_code, workflow_code = None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str = 'code,name,version,description,project_code,release_state,user_id,global_params,flag,locations,warning_group_id,timeout,tenant_id,execution_type,operator,operate_time,create_time,update_time'
    table_name = 't_ds_process_definition_log'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]

    if workflow_code is None:
        sql_1 = "select %s from %s where project_code=%d" % (column_str, table_name, project_code)
        sql_delete = "delete from %s where project_code=%d" % (table_name, project_code)
        t_cursor.execute(sql_delete);
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()


    else:
        sql_1 = "select %s from %s where project_code=%d and code=%d" % (
        column_str, table_name, project_code, workflow_code)
        # 查询源库
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            # 循环遍历获取code列的所有值
            for row in result_set:
                # 删除目标库数据
                sql_2 = "delete from %s where code in (%s)" % (table_name, row[0])
                t_cursor.execute(sql_2);
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()


# 导出测试环境的任务
def migrate_t_ds_process_task_relation(project_code, workflow_code = None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str = 'name,project_code,process_definition_code,process_definition_version,pre_task_code,pre_task_version,post_task_code,post_task_version,condition_type,condition_params,create_time,update_time'
    table_name = 't_ds_process_task_relation'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]

    if workflow_code is None:
        sql_1 = "select %s from %s where project_code=%d" % (column_str,table_name,project_code)
        sql_delete = "delete from %s where project_code=%d" % (table_name, project_code)
        t_cursor.execute(sql_delete);
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    else:
        sql_1 = "select %s from %s where project_code=%d and process_definition_code=%d" % (column_str,table_name,project_code,workflow_code)
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            # 循环遍历获取code列的所有值
            for row in result_set:
                # 删除目标库数据
                sql_2 = "delete from %s where project_code=%s and process_definition_code=%s" % (table_name,row[1], row[2])
                t_cursor.execute(sql_2);
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()


def migrate_t_ds_process_task_relation_log(project_code, workflow_code = None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str = 'name,project_code,process_definition_code,process_definition_version,pre_task_code,pre_task_version,post_task_code,post_task_version,condition_type,condition_params,operator,operate_time,create_time,update_time'
    table_name = 't_ds_process_task_relation_log'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]

    if workflow_code is None:
        sql_1 = "select %s from %s where project_code=%d" % (column_str,table_name,project_code)
        sql_delete = "delete from %s where project_code=%d" % (table_name, project_code)
        t_cursor.execute(sql_delete);
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    else:
        sql_1 = "select %s from %s where project_code=%d and process_definition_code=%d" % (column_str,table_name,project_code,workflow_code)
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            # 循环遍历获取code列的所有值
            for row in result_set:
                # 删除目标库数据
                sql_2 = "delete from %s where project_code=%s and process_definition_code=%s" % (table_name,row[1], row[2])
                t_cursor.execute(sql_2);
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()

def migrate_t_ds_schedules(project_code, workflow_code = None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str = 'process_definition_code,start_time,end_time,timezone_id,crontab,failure_strategy,user_id,release_state,warning_type,warning_group_id,process_instance_priority,worker_group,environment_code,create_time,update_time'
    table_name = 't_ds_schedules'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]
    if workflow_code is None:
        sql_1 = "select %s from %s where process_definition_code in " \
                     "(select code from t_ds_process_definition where project_code=%d)" % (column_str,table_name,project_code)
        sql_delete = "delete from %s where process_definition_code in " \
                     "(select code from t_ds_process_definition where project_code=%d)" % (table_name, project_code)
        t_cursor.execute(sql_delete);
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()

    else:
        sql_1 = "select %s from %s where process_definition_code=%d" % (column_str,table_name,workflow_code)
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchall()
        if (len(result_set) == 0):
            print('%s 没有数据' % (table_name))
        else:
            # 循环遍历获取code列的所有值
            for row in result_set:
                # 删除目标库数据
                sql_2 = "delete from %s where  process_definition_code=%s" % (table_name, row[0])
                t_cursor.execute(sql_2);
            # 插入目标库
            sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
            t_cursor.executemany(sql_3, result_set)
            target_conn.commit()


    #查询源库



    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()



# 导出测试环境的任务
def migrate_t_ds_project(project_code,workflow_code=None):
    s_cursor = source_conn.cursor()
    t_cursor = target_conn.cursor()
    column_str = 'name,code,description,user_id,flag,create_time,update_time'
    table_name = 't_ds_project'
    val_str = ('%s,' * len(column_str.split(',')))[:-1]

    sql_delete = "delete from %s where code=%s" % (table_name,project_code)
    t_cursor.execute(sql_delete);

    sql_1 = "select %s from %s where code=%d" % (column_str, table_name, project_code)
    s_cursor.execute(sql_1)
    result_set = s_cursor.fetchall()

    sql_3 = "insert into %s(%s)values (%s)" % (table_name, column_str, val_str)
    t_cursor.executemany(sql_3, result_set)
    target_conn.commit()

    # 关闭数据库连接
    s_cursor.close()
    t_cursor.close()







#######
def  query_ds_project_code(project_name):
    s_cursor = source_conn.cursor()
    sql_1 = "select code from t_ds_project where name='%s'"%(project_name)
    s_cursor.execute(sql_1)
    result_set = s_cursor.fetchone()
    if result_set is None:
        print(project_name+' 不存在,检查工程名')
        exit()
    else:
        return result_set[0]

def  query_ds_workflow_code(project_code,workflow_name=None):
    s_cursor = source_conn.cursor()
    if workflow_name is None:
        return None
    else:
        sql_1 = "select code from t_ds_process_definition where project_code='%s' and name='%s'" % (project_code, workflow_name)
        s_cursor.execute(sql_1)
        result_set = s_cursor.fetchone()
        if result_set is None:
            print(workflow_name + ' 不存在,检查工作流名称')
            exit()
        else:
            return result_set[0]











def main(project_name,workflow_name=None ):
    project_code = query_ds_project_code(project_name)
    workflow_code = query_ds_workflow_code(project_code,workflow_name)
    migrate_t_ds_task_definition(project_code, workflow_code)
    migrate_t_ds_task_definition_log(project_code, workflow_code)
    migrate_t_ds_process_definition(project_code, workflow_code)
    migrate_t_ds_process_definition_log(project_code, workflow_code)
    migrate_t_ds_process_task_relation(project_code, workflow_code)
    migrate_t_ds_process_task_relation_log(project_code, workflow_code)
    migrate_t_ds_schedules(project_code, workflow_code)
    migrate_t_ds_project(project_code)

if __name__ == '__main__':
    main('工程名')
    source_conn.close()
    target_conn.close()