python操作sqlite

发布时间 2023-09-03 11:46:53作者: 百熟小生
import json
import sqlite3
import pandas as pd


class SqliteTool:
    def __init__(self, db_path):
        self.db_path = db_path
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row
        self.cursor = self.conn.cursor()

    def is_exist_table(self, table_name):
        '''
        判断表是否存在,存在为1,不存在为0
        '''
        sql = f"select count(*)  from sqlite_master where type='table' and name='{table_name}'"
        data = self.cursor.execute(sql)
        count = 0
        for d in data:
            count = dict(d).get("count(*)")
            break
        if count == 0:
            print(f"table_name={table_name}不存在!")
        return count

    def get_tables(self):
        '''
        获取数据库中所有表
        '''
        table_list = self.cursor.execute("select name from sqlite_master where type='table' order by name;")
        tables = []
        for table in table_list:
            tables.append(dict(table).get("name"))
        print(f"数据库中总共有表:{len(tables)}张,tables={tables}")
        return tables

    def get_one_table_data(self, table_name, fields=[]):
        if len(fields) == 0:
            sql = f"select * from {table_name}"
            fields = self.get_fields(table_name)
        else:
            fields_str = ",".join(fields)
            sql = f"select {fields_str} from {table_name}"
        print(f"需要获取的字段有:{fields}")
        data_lst = self.cursor.execute(sql)
        data_list = []
        for data in data_lst:
            dic = dict(data)
            one_line = []
            for field in fields:
                field_value = dic.get(field)
                one_line.append(field_value)
            data_list.append(one_line)
        result = {
            "fields": fields,
            "data_list": data_list
        }
        return result

    def delete_table(self, table_name):
        '''删除表'''
        try:
            sql = f"drop table if exists {table_name}"
            self.cursor.execute(sql)
        except Exception as e:
            return f"删除表失败,error={e}"

    def execute_sql(self, sql):
        '''执行sql'''
        try:
            self.cursor.execute(sql)
        except Exception as e:
            return f"execute_sql fail error={e}"

    def get_fields(self, table_name):
        '''
        获取表中所有字段
        '''
        sql = f"PRAGMA table_info([{table_name}])"
        data_lst = self.cursor.execute(sql)
        fields = []
        for data in data_lst:
            name = dict(data).get("name")
            fields.append(name)
        return fields

    def data2json(self, table_name, json_file, fields=[]):
        try:
            if len(fields) == 0:
                sql = f"select * from {table_name}"
                fields = self.get_fields(table_name)
            else:
                fields_str = ",".join(fields)
                sql = f"select {fields_str} from {table_name}"
            print(f"需要获取的字段有:{fields}")
            data_lst = self.cursor.execute(sql)
            data_list = []
            f = open(json_file, "w", encoding="utf-8")
            for data in data_lst:
                dic = dict(data)
                data_list.append(dic)
                f.write(json.dumps(dic, ensure_ascii=False) + "\n")
                f.flush()
            f.close()
        except Exception as e:
            print(f"数据存入json文件失败!,e={e}")

    def data2excel(self, table_name, excel_path, fields=[]):
        '''
        @table_name:表名
        @excel_path:保存数据的excel地址
        @fields:需要获取的字段
        '''
        try:
            is_exist = self.is_exist_table(table_name)
            if is_exist == 1:
                result = self.get_one_table_data(table_name, fields)
                if result:
                    headers = result.get("fields")
                    data_list = result.get("data_list")
                    data_list.insert(0, fields)
                    df = pd.DataFrame(data_list, columns=headers)
                    df.to_excel(excel_path)
        except Exception as e:
            print(f"数据存入excel失败!,e={e}")

    def run(self):
        # self.get_tables()
        table_name = "douyin_juliang_suggestions"
        # ['id', 'createtime', 'updatetime', 'suggestions', 'keyword']
        # datas = self.get_one_table_data(table_name, ["keyword", "suggestions"])
        # print(datas)
        # is_exist = self.is_exist_table(table_name)
        # print(is_exist)
        # excel_path= "1.xlsx"
        # self.data2excel(table_name, excel_path, fields=[])
        pass

    def close(self):
        self.cursor.close()
        self.conn.close()

if __name__ == '__main__':
    db_path = "D:\gk\kuaishou.db"
    s = SqliteTool(db_path=db_path)
    table_name="kuaishou_data"
    excel_path="1.xlsx"
    s.get_tables()
    s.data2excel(table_name,excel_path)
    s.run()
    s.close()