python读取Oracle相关表生成sqlldr脚本文件

发布时间 2023-03-24 09:24:45作者: 洺剑残虹
# -*- coding: utf-8 -*-
import os
import pandas as pd
import cx_Oracle as cx
# os.environ['path'] =  r'D:\Program Files\plsql developer8.0\instantclient_19_12'
# 设置oci路径
os.environ['path'] = './instantclient_19_12'

class create_sqlldr_ctl:
    def __init__(self,sql:str,out_ctl_path:str,ctl_data_path:str,seq:str):
        self.table_columns_sql = sql
        self.out_ctl_path = os.path.abspath(out_ctl_path)
        self.ctl_data_path = ctl_data_path
        self.seq = seq
        self.ft = False

        # Oracle数据库连接IP和用户密码
        self.user = 'xxx'
        self.passwd = 'xxx'

        self.url = 'xx.xx.xx.xx.1521/xx'

        # sh脚本配置
        self.sh_txt = 'sqlldr userid=' + self.user  + '/' + self.passwd + '@//' + self.url
        self.control = ' control='+ self.ctl_data_path + 'cmdpath/ctl/{}.ctl'
        self.bad = ' bad='+ self.ctl_data_path +'cmdpath/bad/LOAD_{}.bad'
        self.log = ' log='+ self.ctl_data_path +'cmdpath/log/LOAD_{}.log errors=99999999'

        #结尾符号:str : 737472274127422743270A  strX : 73747258274127422743270A
        # ctl脚本配置 direct=TRUE,
        self.ctl_txt =\
 '''options(parallel=TRUE,bindsize=20480000,readsize=51200000,errors=0,rows=60000)
LOAD DATA
CHARACTERSET AL32UTF8
INFILE ''' + "'" + self.ctl_data_path + "workpath/"  +\
'''{}_YYYYMMDD_000001.del'  "str x'73747258274127422743270A'"  
DISCARDMAX 999999999999
TRUNCATE INTO TABLE {}
FIELDS TERMINATED BY '{}'
TRAILING NULLCOLS
(
'''

    def _del_all_file(self,path):
        ls = os.listdir(path)
        for i in ls:
            c_path = os.path.join(path, i)
            if os.path.isdir(c_path):
                self._del_all_file(c_path)
            else:
                os.remove(c_path)

    def __delete__(self):
        dirPath = self.out_ctl_path

        # 判断文件是否存在
        if (os.path.exists(dirPath)):
            self._del_all_file(dirPath)
        else:
            # print("要删除的文件不存在!")
            os.mkdir(dirPath+'/ctl')
            os.mkdir(dirPath + '/log')
            os.mkdir(dirPath + '/bad')

    def _pd_connDB(self):
        try:
            db = cx.connect(self.user, self.passwd, self.url)
            df: pd.DataFrame = pd.read_sql_query(self.table_columns_sql, db)
            db.close()
            self.ft = True

            return df
        except Exception as e:
            print(e)
            self.ft  = False


    def create__ctl(self):

        # 删除路径下所有 ctl、sh、log等文件
        self.__delete__()

        user_table_columns:pd.DataFrame = self._pd_connDB()
        # user_table_columns.to_csv('./data/tab_columns.csv',sep=',')
        if self.ft:
            w, d = user_table_columns.shape

            table_name = user_table_columns['TABLE_NAME'].drop_duplicates().tolist()
            for tb_name in table_name:
                
                ftp_name = tb_name.replace('GNS_','')+'_D_ADD' # _ALL
                # ftp_name = tb_name.replace('GNS_','')+'_D_ALL'
                #创建sh脚本
                _control = self.control.format(ftp_name)
                _bad = self.bad.format(ftp_name)
                _log = self.log.format(ftp_name)
                _sh_txt = self.sh_txt + _control + _bad + _log
                with open(self.out_ctl_path + '/' + ftp_name + '.sh', 'w', encoding='utf-8') as f:
                    f.write(_sh_txt)

                with open(self.out_ctl_path + '/bad/LOAD_' + ftp_name + '.bad', 'w', encoding='utf-8') as f:
                    f.write('')

                with open(self.out_ctl_path + '/log/LOAD_' + ftp_name + '.log', 'w', encoding='utf-8') as f:
                    f.write('')

                # 创建ctl脚本

                _ctl_txt:str = self.ctl_txt.format(ftp_name,tb_name,self.seq) # (表名,表名,分隔符)
                _sh_txt_s:str = ''
                for i in range(w):
                    if tb_name == user_table_columns.TABLE_NAME[i]:
                        # TO_DATE(TRIM(:MER_BUSINESSOPENDATE),'yyyy-mm-dd hh24:mi:ss')
                        if user_table_columns.DATA_TYPE[i]== 'DATE':
                            _sh_txt_s += user_table_columns.COLUMN_NAME[i] + '''    "TO_DATE(TRIM(:{}),'yyyy-mm-dd hh24:mi:ss')",'''.format(
                                user_table_columns.COLUMN_NAME[i]) + '\n'

                        # if user_table_columns.DATA_TYPE[i]== 'DATE':
                        #     _sh_txt_s += user_table_columns.COLUMN_NAME[i] + '''    "TO_DATE(RTRIM(:{},'.000000'),'yyyy-mm-dd hh24:mi:ss')",'''.format(
                        #         user_table_columns.COLUMN_NAME[i]) + '\n'
                            # SELECT TO_TIMESTAMP('2021-02-20 11:08:31','yyyy-mm-dd hh24:mi:ss.ff') FROM DUAL
                            # _sh_txt_s += user_table_columns.COLUMN_NAME[
                            #                  i] + '''    "TO_TIMESTAMP(TRIM(:{}),'yyyy-mm-dd hh24:mi:ss.ff')",'''.format(
                            #     user_table_columns.COLUMN_NAME[i]) + '\n'
                        else:
                            _sh_txt_s  += user_table_columns.COLUMN_NAME[i] + '    "TRIM(:{})",'.format(user_table_columns.COLUMN_NAME[i]) + '\n'

                _ctl_txt += _sh_txt_s + ')'

                _ctl_txt = _ctl_txt.replace(',\n)','\n)')

                with open(self.out_ctl_path + '/ctl/' + ftp_name  + '.ctl', 'w', encoding='utf-8') as f:
                    f.write(_ctl_txt)

                print('{}.sh/{}.ctl/LOAD_{}.bad/LOAD_{}.log 文件创建成功'.format(ftp_name,ftp_name,ftp_name,ftp_name))

        else:
            print('连接失败')



if __name__ == '__main__':


    sql1 = '''SELECT * FROM USER_TAB_COLUMNS T WHERE T.TABLE_NAME IN('table_name') ORDER BY TABLE_NAME,COLUMN_ID ASC '''

    path = '/data/'
    ctl_model = create_sqlldr_ctl(sql=sql1,
                                  out_ctl_path='./cmdpath',
                                  ctl_data_path=path,
                                  seq='|@|')
    ctl_model.create__ctl()