一个查询数据库并解析其中的json格式的数据,并处理后保存到另外的表的python程序

发布时间 2023-08-23 12:30:04作者: 爱家家的卡卡
  1 import json
  2 import pymysql
  3 import datetime
  4 from decimal import Decimal
  5 from decouple import config
  6 
  7 
  8 # 获取每日汇率的方法
  9 def get_currency_rate(code):
 10     currency_db_host = config("CURRENCY_DB_HOST")
 11     currency_db_database = config("CURRENCY_DB_DATABASE")
 12     currency_db_username = config("CURRENCY_DB_USERNAME")
 13     currency_db_password = config("CURRENCY_DB_PASSWORD")
 14 
 15     connection = pymysql.connect(
 16         host=currency_db_host,
 17         database=currency_db_database,
 18         user=currency_db_username,
 19         password=currency_db_password
 20     )
 21 
 22     try:
 23         with connection.cursor() as cursor:
 24             sql = f"SELECT * FROM currency_rate WHERE r_code = '{code}' ORDER BY cdate DESC LIMIT 1"
 25             cursor.execute(sql)
 26             result = cursor.fetchone()
 27             return result
 28     finally:
 29         connection.close()
 30 
 31 
 32 # 获取数据表cai_niao_company_daily_snapshots的数据
 33 def get_data_table(db_host, db_database, db_username, db_password):
 34     connection = pymysql.connect(
 35         host=db_host,
 36         database=db_database,
 37         user=db_username,
 38         password=db_password
 39     )
 40 
 41     try:
 42         with connection.cursor() as cursor:
 43             today = datetime.datetime.now().strftime('%Y-%m-%d')
 44             sql = f"SELECT * FROM cai_niao_company_daily_snapshots WHERE download_date = '{today}' AND status = 1 ORDER BY created_at DESC LIMIT 1"
 45             cursor.execute(sql)
 46             result = cursor.fetchone()
 47             return result
 48     finally:
 49         connection.close()
 50 
 51 
 52 # 解析json数据并处理
 53 def process_json_data(exchange_rate, parsed_data, db_host, db_database, db_username, db_password):
 54     download_date = datetime.datetime.now().strftime('%Y-%m-%d')
 55     created_at = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 56     updated_at = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 57 
 58     # Connect to the database
 59     connection = pymysql.connect(
 60         host=db_host,
 61         database=db_database,
 62         user=db_username,
 63         password=db_password
 64     )
 65     print(parsed_data)
 66 
 67     try:
 68         for parsed_info in parsed_data:
 69             # 将json_shop_info转换为字符串
 70             json_shop_infos = json.loads(parsed_info["json_shop_infos"])
 71             json_shops = []
 72             for json_shop_info in json_shop_infos:
 73                 json_shops.append(
 74                     json_shop_info['shopName'] + '(' + json_shop_info['shopId'] + ')')
 75             json_shop_infos_str = "<br>".join(json_shops)
 76 
 77             # 准备SQL语句
 78             query = "INSERT INTO cai_niao_watermark_dailylogs (download_date, customer_id, customer_name, shop_num, json_shop_infos, currency, balance_amt, credit_total_line, waterlevel_status, stock_amt, stock_central_amt, receive_amt, cust_lvl_v2, cust_lvl_v3, head_trip_amt, gmt_modified, watermark, exchange_rate, status, created_at, updated_at, watermark_sw, watermark_cw, watermark_sw_usd, watermark_cw_usd) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
 79 
 80             # 启动事务
 81             with connection.cursor() as cursor:
 82                 # 执行SQL语句
 83                 cursor.execute(query, (
 84                     download_date,
 85                     parsed_info["customer_id"],
 86                     parsed_info["customer_name"],
 87                     parsed_info["shop_num"],
 88                     json_shop_infos_str,
 89                     parsed_info.get("currency", "-"),
 90                     Decimal(parsed_info["balance_amt"]) / 100,
 91                     Decimal(parsed_info["credit_total_line"]) / 100,
 92                     parsed_info["waterlevel_status"],
 93                     Decimal(parsed_info["stock_amt"]) / 100,
 94                     Decimal(parsed_info["stock_central_amt"]) / 100,
 95                     Decimal(parsed_info["receive_amt"]) / 100,
 96                     parsed_info.get("cust_lvl_v2", "-"),
 97                     parsed_info.get("cust_lvl_v3", "-"),
 98                     Decimal(parsed_info["head_trip_amt"]) / 100,
 99                     parsed_info["gmt_modified"],
100                     round(Decimal(parsed_info["credit_total_line"]) /
101                           100 / Decimal(exchange_rate) * 1000000) / 1000000,
102                     Decimal(exchange_rate),
103                     1,
104                     created_at,
105                     updated_at,
106                     Decimal(0),
107                     Decimal(0),
108                     Decimal(0),
109                     Decimal(0)
110                 ))
111 
112         # 提交事务
113         connection.commit()
114 
115     except Exception as e:
116         # 回滚事务
117         connection.rollback()
118         print("Failed to insert data:", e)
119 
120     finally:
121         # 关闭连接
122         connection.close()
123 
124 
125 # 主程序
126 if __name__ == "__main__":
127     # 获取汇率
128     exchange_rate = get_currency_rate("USDCNY")
129     if exchange_rate:
130         exchange_rate = exchange_rate[2]
131     # PRO调用获取数据表的方法
132     pro_db_host = config("PRO_DB_HOST")
133     pro_db_database = config("PRO_DB_DATABASE")
134     pro_db_username = config("PRO_DB_USERNAME")
135     pro_db_password = config("PRO_DB_PASSWORD")
136     pro_data_table = get_data_table(
137         pro_db_host, pro_db_database, pro_db_username, pro_db_password)
138     if pro_data_table:
139         raw_data = pro_data_table[2]
140         parsed_data = json.loads(raw_data)
141         process_json_data(exchange_rate, parsed_data, pro_db_host,
142                           pro_db_database, pro_db_username, pro_db_password)
143 
144     # U01调用获取数据表的方法
145     u01_db_host = config("U01_DB_HOST")
146     u01_db_database = config("U01_DB_DATABASE")
147     u01_db_username = config("U01_DB_USERNAME")
148     u01_db_password = config("U01_DB_PASSWORD")
149     u01_data_table = get_data_table(
150         u01_db_host, u01_db_database, u01_db_username, u01_db_password)
151     if u01_data_table:
152         raw_data = u01_data_table[2]
153         parsed_data = json.loads(raw_data)
154         process_json_data(exchange_rate, parsed_data, u01_db_host,
155                           u01_db_database, u01_db_username, u01_db_password)
156 
157     # U02调用获取数据表的方法
158     u02_db_host = config("U02_DB_HOST")
159     u02_db_database = config("U02_DB_DATABASE")
160     u02_db_username = config("U02_DB_USERNAME")
161     u02_db_password = config("U02_DB_PASSWORD")
162     u02_data_table = get_data_table(
163         u02_db_host, u02_db_database, u02_db_username, u02_db_password)
164     if u02_data_table:
165         raw_data = u02_data_table[2]
166         parsed_data = json.loads(raw_data)
167         process_json_data(exchange_rate, parsed_data, u02_db_host,
168                           u02_db_database, u02_db_username, u02_db_password)