Python SAP 脚本定时自动下载资产清单 S_ALR_87011990

发布时间 2023-08-24 22:14:04作者: CrossPython

业务场景

使用了外部工作流系统管理固定资产申请,转移(负责人变更), 盘点,报废等涉及固定资产的业务,而固定资产采购,折旧等仍在SAP中进行,所以需要定时从SAP中下载包括固定资产账面值的固定资产清单,以作为工作流审批节点流转的依据

主要功能说明

  1. 定时运行
  2. 自动登录SAP
  3. 下载SAP固定资产报表 S_ALR_87011990,(需在SAP中提前设置好报表输出格式layout)
  4. 下载成跳格分享的csv文本文件
  5. 进行数据格式处理:日期格式转换,数字格式转换
  6. 添加目前数据库的字段
  7. 写入目标数据库
  8. 执行目录数据库中的后处理SQL语句

附注:每次运行记录数8000多笔,总耗时约1分钟,总体性能可接受

 

import os,win32com.client
import time,csv
from datetime import datetime
import datetime as datetime1
import schedule
from utils import connect_db, close_db,get_configer,get_logger, timestamp
from sap_utils import SAP,_get_sap_session,close_sap,get_sap_session, send_email
from pprint import pprint

cf =get_configer('asset_interface.conf')
logger = get_logger('asset_interface.log')
if cf.has_option('transaction', 'testing') and cf.getboolean('transaction', 'testing'):
    testing_flag = True
else:
    testing_flag = False

def execute_transaction(session, conn, cursor):
    session.findById("wnd[0]/tbar[0]/okcd").Text = cf.get('transaction','tcode')  #"/nS_ALR_87011990"
    session.findById("wnd[0]").sendVKey(0)
    session.findById("wnd[0]/usr/radXEINZEL").Select()
    session.findById("wnd[0]/usr/ctxtBUKRS-LOW").Text = cf.get('transaction','company') 
    session.findById("wnd[0]/usr/ctxtBEREICH1").Text = "60"
    session.findById("wnd[0]/usr/ctxtSRTVR").Text = cf.get('transaction','sort_variant') #"0002"
 
    sap = SAP(session)
    date_format = sap.get_date_format("wnd[0]/usr/ctxtBERDATUM")  #dmY
    format_str = '%' + '%'.join(date_format) #'%m%d%Y'
    next_month = datetime.today().replace(day=28) + datetime1.timedelta(days=4)
    end_of_month = (next_month - datetime1.timedelta(days=next_month.day)).strftime(format_str)
 try:
        default_date = session.findById("wnd[0]/usr/ctxtBERDATUM").Text
        session.findById("wnd[0]/usr/ctxtBERDATUM").Text = end_of_month
    except:
        session.findById("wnd[0]/usr/ctxtBERDATUM").Text = default_date
 
    session.findById("wnd[0]/tbar[1]/btn[8]").press()
    #session.findById("wnd[0]/mbar/menu[0]/menu[1]/menu[1]").Select()
    session.findById("wnd[0]/mbar/menu[0]/menu[1]/menu[2]").select()
    session.findById("wnd[1]/usr/subSUBSCREEN_STEPLOOP:SAPLSPO5:0150/sub:SAPLSPO5:0150/radSPOPLI-SELFLAG[1,0]").select()  
    session.findById("wnd[1]/tbar[0]/btn[0]").press()
    full_file_name = save_csv(session, 'asset_list_report')
 print('full_file_name=', full_file_name)
 if full_file_name:        
        data = get_data_from_file(full_file_name, date_format)
 
    cursor.execute ("delete from app_fd_F01_AssetMaster")
    db_fields =['id',
 'c_AssetNo',
 'c_SubNumber',
 'c_AssetClass',
 'c_AssetDescription',
 'c_WBS',
 'c_Plant',
 ' c_CostCenter',
 'c_AssetOwnerNo',
 'c_CapitalizedDate',
 'c_DeactivationDate',
 'c_Currency',
 'c_CurrBkVal',
 'c_CurrentAPC',
 'c_AccumulDep',
 'dateCreated',
 'createdBy']
    s = "INSERT into app_fd_F01_AssetMaster (%s) VALUES (%s) " %(','.join(db_fields), ','.join(['?']*len(db_fields)))    
    record_value = []
    total_row = len(data)
    total_records_updated = 0
 for (j, row) in enumerate(data):
        imod = j % 1000
 if row[0] and row[11]:            
            record_value.append(row)
            total_records_updated += 1
 if record_value and (j == total_row -1 or (j and imod == 0)):
            cursor.executemany(s, record_value)
            cursor.commit()
            record_value =[]            
 return total_records_updated

def save_csv(session,tcode, file_folder=None):
 if file_folder:
        session.findById("wnd[1]/usr/ctxtDY_PATH").text = file_folder 
 else:
        file_folder = session.findById("wnd[1]/usr/ctxtDY_PATH").text
 
    file_name = f"{tcode}_{datetime.now():%y%m%d_%H%M%S}.csv"
    session.findById("wnd[1]/usr/ctxtDY_FILENAME").text = file_name
 
    session.findById("wnd[1]/tbar[0]/btn[0]").press()
    result = None 
    full_file_name = os.path.join(file_folder, file_name)
    time.sleep(1)    
 for i in range(720):
 if os.path.exists(full_file_name):
            result = full_file_name
 break
        time.sleep(1) 
 
 return result

def get_data_from_file(file_name, date_format):
    data = []
    with open(file_name, "r", newline='', encoding='unicode_escape') as csvfile:
        rows = csv.reader(csvfile, delimiter = '\t')
        rows = [r for r in rows]

    columns = rows[6]                                               #获取第7行标题列
    valid_columns = [c for c in columns if c]                       #剔除空字段,获取非空标题字段,解决字段间多个tab的情况
    target_column_count = len(valid_columns)
 for i, row in enumerate(rows):                
 if i > 7 and row and len(row) > 4:                          #剔除空行,结尾标记行,
            row = [c for idx, c in enumerate(row) if columns[idx]]  #剔除空列标题对应的字段值
            col_count = len(row)            
 if col_count < target_column_count:                     #补齐最后几个空列,最后几列无内容时,也没有tab分隔符,
                row += [None for j in range(target_column_count - col_count)]                        
            data.append(row)
 
    fields = cf.get('transaction','fields')             #从配置文件中获取字段清单
    fields = fields.split(',')
 if len(valid_columns) < len(fields):
        logger.info('missing Fields in sap layout')        
 return
    #字段名顺序匹配,导出时因字段名输出长度不一致,会有短,中,长三种标签输出
    # col_check = [c for (i, c) in enumerate(fields) if c != valid_columns[i]]
    # if col_check:
    #     logger.info('Fields sequence should be same as in asset_interface.conf file, field index')        
    #     return
    result = []
    ymd_pos = get_ymd_pos(date_format)
 for row in data:        
 for column_idx, value in enumerate(row):
 if not value: continue
 if 8<= column_idx <=9:         #日期字段处理:根据SAP用户格式转换成 yyyy/mm/dd格式
                row[column_idx]  = convert_date(value, ymd_pos)           
            elif 11<= column_idx <=13:       #数字字段处理,去掉千分位分隔符,去掉首尾空格,将末尾负号移至最前面
                value = value.strip().replace(',','')
                value = f"-{value[:-1]}" if value[-1] == '-' else value
                row[column_idx]  = value
 if column_idx == 11 and value == '0.00':
                row[column_idx]  = 0 
        row.extend([datetime.now().strftime('%Y/%m/%d %H:%M:%S'),os.environ['username']])    #添加时间戳和当前用户
        row.insert(0,row[0])                #将资产号作为ID
        result.append(row)
    pprint('get_data_from_file 2 records %s' % result[:2])            
 return result

def get_ymd_pos(date_format):
 """根据格式化字符串,解析年、月、日位置,
        date_format: 如dmY, mdY,Ymd
        返回{'Y':(6,10),
             'm':(3,5),
            'd':(0,2)
    """
    start = 0
    ymd_pos = {}
 for k in date_format:
        begin = start
        length = 3 if k in ['m','d'] else 5
        start += length
        ymd_pos[k] = (begin, start - 1)
 return ymd_pos

def convert_date(date_str, ymd_pos):
 """从日期字符串中按位置取出年、月、日,再按 固定年/月/日格式字符串返回"""
    pos = ymd_pos.get('Y')
    y = date_str[pos[0]: pos[1]]
    pos = ymd_pos.get('m')
    m = date_str[pos[0]: pos[1]]
    pos = ymd_pos.get('d')
    d = date_str[pos[0]: pos[1]]
 return f"{y}/{m}/{d}"

def job():
    post_sql_commands=[
 """
    执行后处理的SQL语句
    """
    ]
    session = conn = 0 
 try:
 print('%s started running the job...' % datetime.now())    
        short_cut_file =cf.get('saplogon','short_cut_file')
        popup_win_title=cf.get('saplogon','popup_win_title')
        pin =cf.get('saplogon','pin')
        wait_sec =cf.get('saplogon','wait_sec')
 if not testing_flag:
            session, msg = get_sap_session(short_cut_file, popup_win_title, pin, wait_sec)
 else:
            session, msg = _get_sap_session()        
 if session:            
            conn, cursor = connect_db(cf.get('db','ip'), cf.get('db','db'))            
            total_records_updated = execute_transaction(session, conn, cursor) 
 for sql in post_sql_commands:
                cursor.execute(sql)
 print('%s records updated' % cursor.rowcount)                            
            send_email('%s Asset updated' % total_records_updated, "admin@abc.om")
 else:
            send_email("Failed logon SAP %s" % msg, 'admin@abc.om')
 print('%s finished running the job...' % datetime.now())                
    except Exception as e:
        send_email("Asset interface run with error %s" % str(e))
        raise
 finally:
 if not testing_flag:        
            close_db(conn)    
            close_sap(session)        
 
def main():
 print('started..')
    run_at=cf.get('schedule','RunAt')
    #mailto=cf.get('mail','MailTo')    
    runat = run_at.split(';')
 for r in runat:
        schedule.every().day.at(r).do(job)
 print('%s waiting for pending job at %s' %(datetime.now(),runat))    
 while True:
        schedule.run_pending()
        time.sleep(1)
 
if __name__ == "__main__":
    job()
    main()