PyMySQL及数据库连接池

发布时间 2023-06-21 15:21:38作者: 守护式等待

1 PyMySQL及数据库连接池

PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,由于频繁连接数据库很耗时,因此将 PyMySQL 对数据库的一系列操作封装到一个类中,实现连接一次数据库就可以完成多次操作,以提高性能。

2 普通的数据库连接

import pymysql


class SQLHelper(object):
    """
    PyMySQL操作数据库
    """
    def __init__(self):
        """
        在实例化对象时自动连接数据库
        """
        # 也可以根据需要将连接数据库的参数放到配置文件中,然后在初始化时读取
        self.connect()

    def open(self):
        """
        创建数据库连接对象和游标对象
        """
        self.conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password="xxx", database='xxx', charset='utf8')
        self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)  # 游标设置为字典类型

    def get_list(self, sql, args=None):
        """
        获取列表数据
        """
        self.cur.execute(sql, args)
        data_list = self.cur.fetchall()
        return data_list

    def get_one(self, sql, args=None):
        """
        获取单条数据
        """
        self.cur.execute(sql, args)
        data = self.cur.fetchone()
        return data

    def modify(self, sql, args=None):
        """
        更新、删除单条数据
        """
        self.cur.execute(sql, args)
        self.conn.commit()

    def bulk_modify(self, sql, args=None):
        """
        批量增加、更新、删除数据(好处:只连接一次,批量操作,只提交一次)
        """
        self.cur.executemany(sql, args)
        self.conn.commit()

    def create(self, sql, args=None):
        """
        增加单条数据,并返回最新自增ID
        """
        self.cur.execute(sql, args)
        self.conn.commit()
        return self.cur.lastrowid

    def close(self):
        """
        关闭连接
        """
        self.cur.close()
        self.conn.close()import pymysql


class SQLHelper(object):
    """
    PyMySQL操作数据库
    """
    def __init__(self):
        """
        在实例化对象时自动连接数据库
        """
        # 也可以根据需要将连接数据库的参数放到配置文件中,然后在初始化时读取
        self.connect()

    def open(self):
        """
        创建数据库连接对象和游标对象
        """
        self.conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password="xxx", database='xxx', charset='utf8')
        self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)  # 游标设置为字典类型

    def get_list(self, sql, args=None):
        """
        获取列表数据
        """
        self.cur.execute(sql, args)
        data_list = self.cur.fetchall()
        return data_list

    def get_one(self, sql, args=None):
        """
        获取单条数据
        """
        self.cur.execute(sql, args)
        data = self.cur.fetchone()
        return data

    def modify(self, sql, args=None):
        """
        更新、删除单条数据
        """
        self.cur.execute(sql, args)
        self.conn.commit()

    def bulk_modify(self, sql, args=None):
        """
        批量增加、更新、删除数据(好处:只连接一次,批量操作,只提交一次)
        """
        self.cur.executemany(sql, args)
        self.conn.commit()

    def create(self, sql, args=None):
        """
        增加单条数据,并返回最新自增ID
        """
        self.cur.execute(sql, args)
        self.conn.commit()
        return self.cur.lastrowid

    def close(self):
        """
        关闭连接
        """
        self.cur.close()
        self.conn.close()

3 数据库连接池

数据库连接池帮我们维护了若干个与数据库的连接,当我们需要连接数据库时,只需向连接池申请一个连接,当操作完数据库后,再将连接放回到连接池。在实际项目中,如果不用 ORM 要写原生 SQL 操作数据库,建议用数据库连接池,可以处理并发请求的场景。在 Python 中,可以通过 DBUtils 模块实现一个数据库连接池。此连接池有两种连接模式:

  • 模式一:为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。
  • 模式二:创建一批连接到连接池,供所有线程共享使用。

DBUtils 的安装

在虚拟环境下安装:

pip install pymysql DBUtils

基于函数封装

from dbutils.pooled_db import PooledDB
import pymysql

POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='xxx',
    database='xxx',
    charset='utf8'
)


def get_list(sql, args=None):
    """
    获取所有数据
    :param sql: SQL语句
    :param args: SQL语句的占位参数
    :return: 查询结果
    """
    conn = POOL.connection()  # 去连接池中获取一个连接
    cur = conn.cursor()
    cur.execute(sql, args)
    result = cur.fetchall()
    cur.close()
    conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭
    return result


def get_one(sql, args=None):
    """
    获取单条数据
    :return: 查询结果
    """
    conn = POOL.connection()
    cur = conn.cursor()
    cur.execute(sql, args)
    result = cur.fetchone()
    cur.close()
    conn.close()
    return result


def modify(sql, args=None):
    """
    修改、增加、删除操作
    :return: 受影响的行数
    """
    conn = POOL.connection()
    cur = conn.cursor()
    result = cur.execute(sql, args)
    conn.commit()
    cur.close()
    conn.close()
    return result


def bulk_modify(sql, args=None):
    """
    批量修改、增加、删除操作
    :return: 受影响的行数
    """
    conn = POOL.connection()
    cur = conn.cursor()
    result = cur.executemany(sql, args)
    conn.commit()
    cur.close()
    conn.close()
    return result


def create(sql, args=None):
    """
    增加数据
    :return: 新增数据行的ID
    """
    conn = POOL.connection()
    cur = conn.cursor()
    cur.execute(sql, args)
    conn.commit()
    cur.close()
    conn.close()
    return cur.lastrowid


if __name__ == '__main__':
    result = get_list("select * from student")
    # result = get_one("select * from class where id=%s and title=%s", ['10', '五班'])
    # result = modify("update class set title=%s where id=%s", ['五班', '10'])
    # result = bulk_modify("update class set title=%s where id=%s", [('五班1', '10'), ('五班2', '11')])
    # result = create("insert into student (name, class_id) values (%s, %s)", ['张三', '10'])
    # result = bulk_modify("insert into student (name, class_id) values (%s, %s)", [('李四', '10'), ('王五', '10')])
    # result = modify("delete from student where id=%s", ['20', ])
    print(result)

6 基于类封装

 

from dbutils.pooled_db import PooledDB
import pymysql


class SQLHelper(object):

    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='xxx',
            database='xxx',
            charset='utf8'
        )

    def open(self):
        conn = self.pool.connection()  # 去连接池中获取一个连接
        cur = conn.cursor()
        return conn, cur

    def close(self, conn, cur):
        cur.close()
        conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭

    def get_list(self, sql, args=None):
        """
        获取所有数据
        :param sql: SQL语句
        :param args: SQL语句的占位参数
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close(conn, cur)
        return result

    def get_one(self, sql, args=None):
        """
        获取单条数据
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close(conn, cur)
        return result

    def modify(self, sql, args=None):
        """
        修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def bulk_modify(self, sql, args=None):
        """
        批量修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.executemany(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def create(self, sql, args=None):
        """
        增加数据
        :return: 新增数据行的ID
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return cur.lastrowid


if __name__ == '__main__':
    db = SQLHelper()
    result = db.get_list("select * from student")
    # result = db.get_one("select * from class where id=%s and title=%s", ['10', '五班'])
    # result = db.modify("update class set title=%s where id=%s", ['五班', '10'])
    # result = db.bulk_modify("update class set title=%s where id=%s", [('五班1', '10'), ('五班2', '11')])
    # result = db.create("insert into student (name, class_id) values (%s, %s)", ['张三', '10'])
    # result = db.bulk_modify("insert into student (name, class_id) values (%s, %s)", [('李四', '10'), ('王五', '10')])
    # result = db.modify("delete from student where id=%s", ['20', ])
    print(result)

7 支持上下文管理

使用 threading.local 实现

import threading

from dbutils.pooled_db import PooledDB
import pymysql


class SQLHelper(object):
    """
    使用 threading.local 实现上下文管理,且是单例模式
    """

    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='xxx',
            database='xxx',
            charset='utf8'
        )
        self.local = threading.local()  # 维护一个栈
        """
        storage = {
            线程ID: {'stack': [(conn, cusor), ]},
        }
        """

    def open(self):
        conn = self.pool.connection()  # 去连接池中获取一个连接
        cur = conn.cursor()
        return conn, cur

    def close(self, conn, cur):
        cur.close()
        conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭

    def get_list(self, sql, args=None):
        """
        获取所有数据
        :param sql: SQL语句
        :param args: SQL语句的占位参数
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close(conn, cur)
        return result

    def get_one(self, sql, args=None):
        """
        获取单条数据
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close(conn, cur)
        return result

    def modify(self, sql, args=None):
        """
        修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def bulk_modify(self, sql, args=None):
        """
        批量修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.executemany(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def create(self, sql, args=None):
        """
        增加数据
        :return: 新增数据行的ID
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return cur.lastrowid

    def __enter__(self):
        conn, cur = self.open()
        rv = getattr(self.local, 'stack', None)
        if not rv:
            self.local.stack = [(conn, cur), ]
        else:
            self.local.stack.append((conn, cur))
        return conn, cur

    def __exit__(self, exc_type, exc_val, exc_tb):
        rv = getattr(self.local, 'stack', None)
        if not rv:
            return
        conn, cur = self.local.stack.pop()
        cur.close()
        conn.close()


if __name__ == '__main__':
    db = SQLHelper()

    with db as (conn, cur):
        cur.execute("insert into student (name, class_id) values (%s, %s)", ['赵六', '10'])
        conn.commit()

    with db as (conn, cur):
        cur.execute("select * from student")
        # result = cur.fetchmany(3)  # 取前3条
        result = cur.fetchall()
        print(result)

8 普通实现方式

from dbutils.pooled_db import PooledDB
import pymysql

# 全局变量定义连接池,只加载一次
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    ping=0,  # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='xxx',
    database='xxx',
    charset='utf8'
)


class SQLHelper(object):
    """
    支持上下文管理,非单例模式
    """

    def __init__(self):
        self.conn = None
        self.cur = None

    def open(self):
        conn = POOL.connection()  # 去连接池中获取一个连接
        cur = conn.cursor()
        return conn, cur

    def close(self, conn, cur):
        cur.close()
        conn.close()  # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭

    def get_list(self, sql, args=None):
        """
        获取所有数据
        :param sql: SQL语句
        :param args: SQL语句的占位参数
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close(conn, cur)
        return result

    def get_one(self, sql, args=None):
        """
        获取单条数据
        :return: 查询结果
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close(conn, cur)
        return result

    def modify(self, sql, args=None):
        """
        修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def bulk_modify(self, sql, args=None):
        """
        批量修改、增加、删除操作
        :return: 受影响的行数
        """
        conn, cur = self.open()
        result = cur.executemany(sql, args)
        conn.commit()
        self.close(conn, cur)
        return result

    def create(self, sql, args=None):
        """
        增加数据
        :return: 新增数据行的ID
        """
        conn, cur = self.open()
        cur.execute(sql, args)
        conn.commit()
        self.close(conn, cur)
        return cur.lastrowid

    def __enter__(self):
        self.conn, self.cur = self.open()
        return self.conn, self.cur

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cur.close()
        self.conn.close()


if __name__ == '__main__':
    with SQLHelper() as (conn, cur):
        cur.execute("insert into student (name, class_id) values (%s, %s)", ['孙七', '10'])
        conn.commit()

    with SQLHelper() as (conn, cur):
        cur.execute("select * from student")
        # result = cur.fetchmany(3)  # 取前3条
        result = cur.fetchall()
        print(result)