sqlalchemy——python的一款开源orm工具

发布时间 2023-04-10 21:38:58作者: leethon

sqlalchemy——python的一款开源orm工具

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

django框架中有自己的orm工具,sqlalchemy一般是flask、fastapi框架经常使用来操作数据库。

pip install sqlalchemy

快速使用

创建表模型

# 第一步:导入
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


# 第三步:继承生成的Base类
class User(Base):
    # 第四步:写字段
    id = Column(Integer, primary_key=True)  # 生成一列,类型是Integer,主键
    name = Column(String(32), index=True, nullable=False)  # name列varchar32,索引,不可为空
    email = Column(String(32), unique=True)
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    # extra = Column(Text, nullable=True)

    # 第五步:写表名 如果不写以类名为表名
    __tablename__ = 'users'  # 数据库表名称

    # 第六步:建立联合索引,联合唯一
    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
        Index('ix_id_name', 'name', 'email'),  # 索引
    )


class Book(Base):
    __tablename__ = 'books'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))

创建一个引擎

sqlalchemy依赖pymysql等第三方插件操作各类数据库。不同的数据库连接的方式会有区别,如要借助pymysql去连接mysql,那么则采取mysql+pymysql://用户名:密码@域名:3306端口/库名

更多数据库的引擎连接方式可以参考官方文档:

https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls

# 不会创建库,只会创建表,很多orm工具都是直接连接创建好的库
# 简单的连接引擎(有些参数配置都有默认值)
engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa")
# 加一些其他配置项的引擎
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/aaa",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 要同步的时候运行以下此py,其他模块引用engine时不会触发
if __name__ == '__main__':  
    # 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
    Base.metadata.create_all(engine)

    # 把所有表删除
    # Base.metadata.drop_all(engine)

数据库会话session

我们可以基于之前创建的数据库engine对象,建立会话session

Session是为特定用户识别和管理状态的一种方式,这里命名为session

我们经历下列步骤就可以简单的建立一个数据库会话。

Session=sessionmaker(bind=engine)  # 生成Session类
session=Session()  # 实例化拿到一个会话

我们的一个项目中,可能会基于一个引擎只创建一个Session类,这里会定义我们连接池的大小等配置;但是每一次请求的会话应该是独立的,需要每次都重新拿到一个session对象。

比方说,我们的engine决定了连接池为5,那么基于这个引擎创建的Session类就是连接池为5,我们再通过Session类产生的session会话,实际上是原本就建立好的5条连接中取出的一条连接,而在每个请求中,如果要对数据库进行操作,我们就需要通过session=Session()取出了一个连接,在此基础上操作数据库。

以下是一个简单的例子:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import User  # 导入数据表

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa") # 连接本地mysql数据库中的aaa
Session=sessionmaker(bind=engine)

def task(i):  # 假设这个函数就是异步函数
	session=Session()   # 每次请求应该重新发起会话,所以此句放在函数内部
    user = User(name=f'张{i}')  # 产生一个数据对象
    session.add(user)  # 将数据对象插入数据库
    session.commit()  # 实际提交生效

session线程安全

但是依照上面的方式,我们每一个函数都需要自己去写session=Session(),有些麻烦,所以我们可以导入全局session,这个与flask框架中的request等全局变量类似,采取全局代理的模式,效果每个线程任务中的session对话虽然用的是一个session变量,但是实际操作的是不同会话,不会出现数据错乱。

这里简述的理论不理解没关系,用下面的代码即可:

from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine

engine = create_engine("数据库连接")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)  # 还是通过Session产生对象,但是变成了类似于threading.Local的类型

def task():
    # 异步任务中也可以直接使用全局的session
    session.add(User(name='leethon'))

简单的增查删改

:通过表模型产生数据对象,通过session.add函数将这一条数据增加到数据库

user = User(name=f'张三')  # 产生一个数据对象
session.add(user)  # 将数据对象插入数据库
session.add_all(user1,user2)  # 如果有多条数据,则可以通过add_all批量添加
  • 至少要按照表模型的必填字段产生数据对象

:查是数据库操作的基本核心操作,查询到数据才能进行修改和删除操作

# select * from users
session.query(User)  # 全查
# select * from users where name='张三'
session.query(User).filter(User.name == '张三')
session.query(User).filter_by(name='张三')

## 以上的结果都为query.Query对象,并没有实际的到数据库里去查,如果打印它们,会返回对应的原生sql
## 如果想拿到结果,则需要使用all()方法或first()方法
res = session.query(User).filter(User.name == '张三')
res.all()  # 列表套数据对象
res.first()  # 数据对象
res.first().id  # 可以通过数据对象点出相应的id

使用filter方法查询时,括号中放的是伪条件,可以使用==、<、>、!=、<=、>=等来表示对应的where筛选条件,左侧为字段名称,右侧为值(也可能为其他字段名称来表示对应字段值)

上述查询语句拿到的query.Query对象再进行.delete()即可

res = session.query(User).filter(User.name == '张三').delete()
print(res)  # 1   返回值是影响的行数
session.commit()

上述查询语句拿到的query.Query对象再进行.update({'字段名':'值','字段2':'值2'})即可

res = session.query(User).filter(User.id == 1).update({'name':'three Z'})
print(res)  # 1   返回值是影响的行数
session.commit()

还可以使用数据对象修改:

res_obj = session.query(User).filter(User.id == 1).first()  # 拿到一个具体的数据对象
session.add(res_obj)  # 使用add函数修改
session.commit()

改与增都用add(一个数据对象),是sqlalchemy对数据对象进行了一个主键的判断,如果这个数据对象没有主键,那么一定是进行增加操作,如果有主键,那么修改数据库中对应主键的数据。

还可以基于原本的字段数据做修改,如原本的name后面统一加一个字符串拼接。

session.query(User).update({User.name: User.name+1}, synchronize_session=False)

session.commit()

## 基于原本的数字字段 做运算
session.query(User).update({User.age: User.age+1}, synchronize_session='evaluate')

查询语法

查局部字段和取别名

res = session.query(User.name.label('username'), User.email).all()  # label取别名

print([i.username for i in res])  # ['李四','王五']

filter与filter_by

  • filter中写where筛选条件,可以含比较关系,取反,成员运算,模糊匹配等
  • filter_by中只能按关键字传参的方式指定筛选条件。
res = session.query(User).filter(User.name != "leethon").all()
res = session.query(User).filter(User.name != "leethon", User.id == 2).all()  # 多个条件,默认为and关系,要求同时成立
res = session.query(User).filter_by(name='leethon').all()

取all得到列表套数据,取first得到一条数据对象。

filter和filter_by相同点是,它们是链式函数,传入query.Query对象,返回也是query.Query对象,这也意味着可以多个filter和filter_by叠在一块查询

res = session.query(User).filter(User.name != "leethon")
res1 = res.filter(User.age == 18)

执行原生sql

res=session.query(User).from_statement(text("SELECT * FROM users where email=:email")).params(email='3@qq.com').all()

使用:变量名来占位,在params中填充参数

ps:这和直接拼接字符串相比,可以防止sql注入问题

between、in_、~

# between 与sql中的between同理,两者之间
res = session.query(User).filter(User.id.between(1, 9)).all()

# in_ 成员运算
res = session.query(User).filter(User.id.in_([1,3,4])).all()

# ~ 非,除去以外的,取反
res = session.query(User).filter(~User.id.in_([1,3,4])).all()
# print(res)

与或

多个逗号隔开的条件默认都是与关系,提供的and_主要是方便我们条件嵌套时可能会用到。

from sqlalchemy import and_, or_

# 两个条件满足一个即可
res = session.query(User.name, User.email) \
	.filter(or_(User.email == '2@qq.com', User.id == 2)).all()
# and_与or_的嵌套使用
res = session.query(User.name, User.email) \
    .filter(or_(User.email == '2@qq.com', and_(User.id == 2, User.name == '王五'))).all()

模糊查询

# select user.id from user where  user.name not like e%;
res = session.query(User.id).filter(~User.name.like('e%'))

分页和排序

# 分页
session.query(User)[2*5:2*5+2]  # query.Query对象使用切片的语法,等同于sql的limit
# 排序
session.query(User).order_by(User.email.desc()).all()  # 降序排
session.query(User).order_by(User.name.desc(), User.id.asc())  # 第一个字段降序排,重复后按第二个字段升序排

分组查询和聚合函数

from sqlalchemy.sql import func  # 聚合函数

# 按照ctime分组(默认按天)
res = session.query(
    User.ctime,
    func.max(User.id).label('max_id'),
    func.sum(User.id).label('sum_id'),
    func.min(User.id)).group_by(User.ctime).all()
print(res)
print([i.ctime.date() for i in res])  # 取出字段后,由于是datetime类型可以取一些特殊形式

# having
res = session.query(
    User.ctime).filter('分组前筛选').group_by(User.ctime).having('分组后筛选').all()

分组查询的字段,如果是严格模式only_full_group下,那么只能是分组字段和聚合函数,这是由数据库的配置决定的。

按月份、年份等分组

from sqlalchemy import extract  
from sqlalchemy.sql import func

session.query(extract('month', User.ctime).label('month'), func.count('*').label('count')).group_by('month').all()  

上述语句中,我们用extract('month', User.ctime),将时间字段的month月份单独拎出来作为一个字段,并用label('month')取了别名,最终用别名字段分组了。

所以可以总结一些需要注意的小点:

  • group_by内部可以写User.ctime这种字段变量,也可以写'month'这种映射的字符串(一般用于别名)
  • count这个聚合函数,数什么字段都是一样的,我们也可以写'*'这样的映射的字符串,当然是因为它写起来更简单。

上述的两点独立于sqlalchemy的查询体系,当做固定用法即可。