一、判断数据表是否存在
在创建数据表前先进行判断所需创建的数据表名在数据库中是否存在
def table_exists(self, table_name):
"""检查数据表是否存在"""
self.connect_to_database()
cursor = self.connection.cursor()
cursor.execute("SHOW TABLES LIKE '{}'".format(table_name))
result = cursor.fetchone()
cursor.close()
self.disconnect_from_database()
if result:
return True
else:
return False
二、创建数据表
根据字典键进行创建数据表,类型为text
def create_table(self, table_name, data_list):
"""创建包含文本列的数据表"""
self.connect_to_database()
cursor = self.connection.cursor()
columns = []
for data in data_list:
for key, value in data.items():
column_type = 'TEXT'
columns.append(f'{key} {column_type}')
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(set(columns))})"
cursor.execute(create_table_sql)
print('数据表创建成功')
cursor.close()
self.disconnect_from_database()
三、插入数据
进行判断一条条数据是否存在,存在则不插入数据,不存在则插入数据
def insert_data(self, table_name, data_list):
"""在表中插入数据"""
self.connect_to_database()
cursor = self.connection.cursor()
for data in data_list:
keys = ', '.join(data.keys())
values = ', '.join([f"'{value}'" for value in data.values()])
select_sql = f"SELECT * FROM {table_name} WHERE name='{data['name']}' and age='{data['age']}'"
cursor.execute(select_sql)
result = cursor.fetchone()
if result:
print(f"数据已存在: {data}")
else:
insert_sql = f"INSERT INTO {table_name} ({keys}) VALUES ({values})"
cursor.execute(insert_sql)
print(f"插入数据: {data}")
self.connection.commit()
cursor.close()
self.disconnect_from_database()
四、完整代码
# -*- coding:utf-8 -*-
import pymysql
class MySQL(object):
def __init__(self):
self.connection = None
def connect_to_database(self):
"""连接到MySQL数据库"""
self.connection = pymysql.connect(
host='localhost',
user='root',
password='Ansight@123',
database='reptile_text',
port=3306
)
def disconnect_from_database(self):
"""断开与MySQL数据库的连接"""
if self.connection:
self.connection.close()
def create_table(self, table_name, data_list):
"""创建包含文本列的数据表"""
self.connect_to_database()
cursor = self.connection.cursor()
columns = []
for data in data_list:
for key, value in data.items():
column_type = 'TEXT'
columns.append(f'{key} {column_type}')
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(set(columns))})"
cursor.execute(create_table_sql)
print('数据表创建成功')
cursor.close()
self.disconnect_from_database()
def table_exists(self, table_name):
"""检查数据表是否存在"""
self.connect_to_database()
cursor = self.connection.cursor()
cursor.execute("SHOW TABLES LIKE '{}'".format(table_name))
result = cursor.fetchone()
cursor.close()
self.disconnect_from_database()
if result:
return True
else:
return False
def insert_data(self, table_name, data_list):
"""在表中插入数据"""
self.connect_to_database()
cursor = self.connection.cursor()
for data in data_list:
keys = ', '.join(data.keys())
values = ', '.join([f"'{value}'" for value in data.values()])
select_sql = f"SELECT * FROM {table_name} WHERE name='{data['name']}' and age='{data['age']}'"
cursor.execute(select_sql)
result = cursor.fetchone()
if result:
print(f"数据已存在: {data}")
else:
insert_sql = f"INSERT INTO {table_name} ({keys}) VALUES ({values})"
cursor.execute(insert_sql)
print(f"插入数据: {data}")
self.connection.commit()
cursor.close()
self.disconnect_from_database()
def main(self):
table_name = 'table_name2'
data_list = [
{'name': 'mo', 'age': '20'},
{'name': '墨子', 'age': '25'},
{'name': '遗忘', 'age': '10'},
{'name': '墨', 'age': '19'},
{'name': '遗', 'age': '09'},
{'name': '忘', 'age': '54'}
]
if self.table_exists(table_name):
print('数据表已存在')
self.insert_data(table_name, data_list)
else:
print('数据表不存在')
self.create_table(table_name, data_list)
self.insert_data(table_name, data_list)
if __name__ == '__main__':
MySQL().main()