sqlalchemy

发布时间 2023-04-10 21:27:55作者: 李阿鸡

sqlalchemy

flask中没有ORM框架对象映射关系, 我们需要使用ORM框架来帮助我们快速操作数据库,需要使用第三方模块。

flask 中使用sqlalchemy 比较多

它是一个基于python实现的ORM框架,该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果

安装

pip install sqlalchemy

sqlalchemy 本身自己是无法操作数据库的,必须依赖pymysql等第三方插件

# 连接格式
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

sqlalchemy快速使用

# 导入
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

# 创建引擎对象
engine = create_engine(
    'mysql+pymysql://root:123@127.0.0.1:3306/luffy',
    max_overflow=0,  # 超过连接池大小外最多创建的连接  0就是0+5 加上链接池的数量
    pool_size=5,  # 连接池大小 # sqlalchemy自带的连接池
    pool_timeout=30,  # 池中没有线程最多等待的时间,超过时间报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) -1就是不回收
)

# 使用引擎对象获取连接,操作数据库
conn =engine.raw_connection()  # 获取连接对象
cursor = conn.cursor()         # 获取游标
cursor.execute('select * from luffy_course')  # 执行sql
print(cursor.fetchall())

使用sqlalchemy创建表并操作数据

在django中关于创建表我们都是放在models.py 里,这里也新建一个models.py

# 导入
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
# 生成基类
Base = declarative_base()
# 继承基类
class Userinfo(Base):
    #   写字段
    id = Column(Integer, primary_key=True)  # 生成一列,整型,主键
    name=Column(String(32),index=True,nullable=False)  # 生成一列,字符串,索引,不为空
    email=Column(String(32),unique=True)  # 生成一列,字符串,唯一索引
    # datetime.datetime.now 取当前时间
    ctime=Column(DateTime,default=datetime.datetime.now)  # 生成一列,时间,时间默认值 不要写now() 不然以后每次都是一样的
    extra=Column(Text,nullable=True)  # 生成一列,文本,可以为空

    # 生成表名
    __tablename__ = 'userinfo'

    # 生成联合唯一索引
    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一索引
        Index('ix_id_name', 'name', 'email'), # name 和email建立联合普通索引
    )

class Book(Base):
    __tablename__='books'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))

    

# 同步到数据库
# 创建引擎对象
engine = create_engine(
    'mysql+pymysql://root:123@127.0.0.1:3306/luffy',  # 不会创建库 会创建表需要提前手动创建库
    max_overflow=0,  # 超过连接池大小外最多创建的连接  0就是0+5 加上链接池的数量
    pool_size=5,  # 连接池大小 # sqlalchemy自带的连接池
    pool_timeout=30,  # 池中没有线程最多等待的时间,超过时间报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) -1就是不回收
)

# 同步到数据库 把Base管理的表都同步创建到数据库中
Base.metadata.create_all(engine) # 不能修改字段,需要借助于第三方


# 删除所有表 把Base管理的表全都删掉
# Base.metadata.drop_all(engine)

快速插入数据

# 导入模块
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 生成engine对象
engine = create_engine(
    'mysql+pymysql://root:123@127.0.0.1:3306/luffy',  # 不会创建库 会创建表需要提前手动创建库
    max_overflow=0,  # 超过连接池大小外最多创建的连接  0就是0+5 加上链接池的数量
    pool_size=5,  # 连接池大小 # sqlalchemy自带的连接池
    pool_timeout=30,  # 池中没有线程最多等待的时间,超过时间报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) -1就是不回收
)

# 拿到session类 传入engine对象
Session = sessionmaker(bind=engine)

# 拿到session对象 相当于conn连接对象
session = Session()


# 新增数据,导入表
from models import Book
# 实例化对象增加数据
book = Book(name='红楼梦')
session.add(book)
# 提交
session.commit()

#关闭
session.close()

scoped_session线程安全

不使用scoped_session来在视图函数中使用sqlalchemy添加数据

from flask import Flask
# 导入用户表
from models import Userinfo

app = Flask(__name__)

# 1.生成engine对象,导入也行
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/luffy")  # 也可以只写一行,其他的参数都是默认的

# 拿到session类 传入engine对象
Session = sessionmaker(bind=engine)

@app.route('/')
def index():
    # 访问这个接口的时候,会在数据库中创建一条数据
    user = Userinfo(name='lxj', email='1@qq.com', extra='110')
    # 可以把session放到视图函数里面,也可以放到外面
    """
    外面: 全局都用一个session,数据会发生错乱 会有并发安全问题
    在flask中就用全局的,不用每次请求都创建一个session 使用scoped_session
    里面: 每次请求都会创建一个session,数据不会发生错乱   """
    # 不使用scoped_session
    session = Session()
    session.add(user)
    session.commit()
    session.close()
    return '增加成功'


if __name__ == '__main__':
    app.run()

把session对象放在外面,全局都使用一个,会导致数据错乱,发生并发安全问题。

使用scoped_session 可以解决这个问题

只需要用类实例化得到session对象的时候用scoped_session 包裹一下后,就是线程安全的。

from sqlalchemy.orm import scoped_session
session = scoped_session(Session)

内部使用了local对象,取当前线程的session,如果当前线程有,就直接返回用,如果没有,创建一个,放到local中, session 是 scoped_session 的对象

我们在查看源码的时候发现scoped_session类上面有一个装饰器,装饰器也可以加在类上面

其原理如下

# session 是  scoped_session 的对象,类上没有属性和方法,但是,用的时候,确实用
session = scoped_session(Session) 


def speak():
    print('汪汪汪')


def wrapper(func):
    def inner(*args, **kwargs):
        res = func()
        res.name = '张红'   # func就是Person,加括号就得到了Person对象 ,可以给对象添加属性 
        res.speak = speak   # 也可以给对象添加方法
        return res

    return inner


@wrapper
class Person:
    pass


p = Person()

print(p.name)
p.speak() 

基本的增删改查

前置操作

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models import Userinfo,Book
engine = create_engine(
    'mysql+pymysql://root:123@127.0.0.1:3306/luffy',
)

Session = sessionmaker(bind=engine)
session = scoped_session(Session)


查询语句

session.commit()
session.close()
# 下面就不贴了

增加操作

"""
add         添加一条数据
add_all     添加多条数据[对象1,对象2]
"""
user = Userinfo(name='lz',email='4@qq.com',extra='122')
user1 = Userinfo(name='hg',email='3@qq.com',extra='119')
book = Book(name='穿越大海')
# session.add(user) # 只能放一个对象
session.add_all([user,user1,book])  # 可以放任意对象

查询操作

# 删除操作,先查出来再删除
"""
session.query(表名)   拿出来是个列表不能使用first,可以写多个,联表操作 
filter            过滤条件,需要 写表达式 == >= <= !=   > < in
filter_by          直接写字段 = 值 式 不要写表达式


"""
# user = session.query(Userinfo).filter(Userinfo.name=='hg').all()
# print(user[0].name) # 因为是个列表,所以要索引取值
res = session.query(Userinfo).filter(Userinfo.id > 1).all()
print(res) # 这里的res是个容器。需要在表模型中写__repr__方法
# 表模型中写
	def __repr__(self):
    	return self.name

#filtet_by  直接写字段 = 值 式 不要写表达式
user = session.query(Userinfo).filter_by(name='lxj').first()
print(user)

删除操作

# 先查再删
user = session.query(Userinfo).filter_by(name='lz').delete()  # 不要first()或者all()出来在delete(),直接.delete()就行
print(user) # 返回删除影响的行数

修改操作

# 先查再改
# 方式一 update修改
user = session.query(Userinfo).filter_by(name='lxj').update({'email':'888@qq.com'})
print(user)

# 方式二 对象修改
user = session.query(Userinfo).filter_by(name='lxj').first()
user.name = '李阿鸡'
session.add(user)  # 这个对象,如果有id(主键)就是修改,没有id(主键)就是添加,在flask的表模型中如果不写id 是不会自动像django一样自动创建id的
print(user.name)

高级查询

查询某几个字段

res = session.query(Userinfo.name.label('xx'),Userinfo.email)
print(res)     # 可以打印出原生sql
print(res.all)  # 可以 .出所有对象
for item in res.all():
     print(item[0])   # 循环取出所有对象

查询所有

# 4.1 查询所有  是list对象
res = session.query(User).all()  # 是个普通列表
print(type(res))
print(len(res))

查询的参数

# 4.1.2 filter传的是表达式,filter_by传的是参数
res = session.query(User).filter(User.name == "lqz").all()
res = session.query(User).filter(User.name != "lqz").all()
res = session.query(User).filter(User.name != "lqz", User.email == '3@qq.com').all()  # django 中使用 Q
res = session.query(User).filter_by(name='lqz099').all()
res = session.query(User).filter_by(name='lqz099',email='47@qq.com').all()
print(len(res))

查询后的结果是list没有first方法

res = session.query(User).first()

查询所有可以使用占位符

需要导入text

from sqlalchemy import text
# 固定写法必须 加 :
res = session.query(Userinfo).filter(text("id<:value or name=:name")).params(value=4,name='lxj').all()  
print(res[0])

自定义查询

可以自己写原生sql进行查询,支持传参

res = session.query(Userinfo).from_statement(text("select * from userinfo where id<:id")).params(id=4).all()
print(res)

表达式 and条件链接

res = session.query(Userinfo).filter(Userinfo.id<4,Userinfo.name=='lxj').all()
print(res)

betwwen范围之间

# between 两个值之间
res = session.query(Userinfo).filter(Userinfo.id.between(1,3)).all() 
print(res)

in_([ ]) 是否存在 在则拿出对象

res = session.query(Userinfo).filter(Userinfo.id.in_([1,2,4])).all()

与或非 ~

# 不存在
res = session.query(Userinfo).filter(~Userinfo.id.in_([1,2])).all()
print(res)

二次筛选(sql中的子查询)

res = session.query(Userinfo).filter(~Userinfo.id.in_(session.query(Userinfo.id).filter_by(name='lxj'))).all()
print(res)

and or

from sqlalchemy import and_, or_
# or_包裹的都是or条件,and_包裹的都是and条件
res = session.query(User).filter(and_(User.id >= 3, User.name == 'lqz099')).all()  #  and条件
res = session.query(User).filter(User.id < 3, User.name == 'lqz099').all()  #  等同于上面
res = session.query(User).filter(or_(User.id < 2, User.name == 'eric')).all()
res = session.query(User).filter(
    or_(User.id < 2,and_(User.name == 'lqz099', User.id > 3),
         User.extra != ""
     )).all()

通配符

res = session.query(User).filter(User.email.like('%@%')).all() # email 里只要含有@ 符号的就能查到

res = session.query(User.id).filter(~User.name.like('e%')) # 名字不以e开头的

分页

# 一页2条,查第5页
res = session.query(User)[2*5:2*5+2]  # page * size :page *size +size

排序

desc 降序   asc升序
# 排序,根据name降序排列(从大到小)
res = session.query(User).order_by(User.email.desc()).all()
res = session.query(Book).order_by(Book.price.desc()).all()
res = session.query(Book).order_by(Book.price.asc()).all()
# 第一个条件重复后,再按第二个条件升序排
res = session.query(User).order_by(User.name.desc(), User.id.asc())

分组查询 5个聚合函数

from sqlalchemy.sql import func
# res = session.query(User).group_by(User.extra)  # 如果是严格模式,就报错

res = session.query(
     User.extra,  			# 当前分组
     func.max(User.id), 	# 最大id
     func.sum(User.id), 	# id之和   下面是最小id
     func.min(User.id)).group_by(User.extra).all() 

having 分组后过滤

# select max(id),sum(id),min(id) from  user group by  user.extra   having id_max>2;
res = session.query(
    func.max(User.id),
    func.sum(User.id),
    func.min(User.id)).group_by(User.extra).having(func.max(User.id) > 2)

写原生sql

方式一

# 第一步:导入
from sqlalchemy import create_engine
# 第二步:生成引擎对象
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/cnblogs",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第三步:使用引擎获取连接,操作数据库
conn = engine.raw_connection()
cursor=conn.cursor()
cursor.execute('select * from aritcle')
print(cursor.fetchall())

方式二

from models import Userinfo, Book
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/luffy")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# # SQLAlchemy 2.0.9 版本需要使用text包裹一下,之前版本不需要
cursor = session.execute(text("select * from userinfo"))
result = cursor.fetchall()
print(result)
# 插入数据
cursor = session.execute(text('insert into books(name) values(:name)'), params={"name": '红楼梦'})
session.commit()
print(cursor.lastrowid)  # 拿新增的id号


django中写原生sql

from django.shortcuts import render,HttpResponse

# Create your views here.
from .models import Book
def index(request):
    books = Book.objects.raw('select * from app01_book where id=1')
    print(books)      # <RawQuerySet: select * from app01_book where id=1>
    print(type(books))   # 是一个RawQuerySet对象
    return HttpResponse('ok')

建表

一对多

# 一对多关系
# 导入
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
# 生成基类
Base = declarative_base()

class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)  # 主键
    caption = Column(String(32))


class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer,primary_key=True)
    name = Column(String(32),index=True,nullable=False) # 建立普通索引,不为空
    # 关联字段写在多的一方
    hobby_id = Column(Integer,ForeignKey('hobby.id'))  # 外键关联hobby表的id字段,hobby指的是tablename名

    hobby = relationship('Hobby',backref='pers')  # 反向引用,建立关系,建立关系的字段名字是hobby,backref是反向引用的字段名字

    def __repr__(self):
        return self.name


# 同步到数据库
# 创建引擎对象
engine = create_engine(
    'mysql+pymysql://root:123@127.0.0.1:3306/luffy',  # 不会创建库 会创建表需要提前手动创建库
    max_overflow=0,  # 超过连接池大小外最多创建的连接  0就是0+5 加上链接池的数量
    pool_size=5,  # 连接池大小 # sqlalchemy自带的连接池
    pool_timeout=30,  # 池中没有线程最多等待的时间,超过时间报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) -1就是不回收
)

# 同步到数据库 把Base管理的表都同步创建到数据库中
Base.metadata.create_all(engine)  # 不能修改字段,需要借助于第三方

一对多关系表新增
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models1 import Hobby, Person

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/luffy")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 一对多新增
# 方式1 先新增后 修改外键字段
# hobby = Hobby(caption='篮球')
# session.add(hobby)
# person = Person(name='lxj')
# session.add(person)

# 2 先新增一 查出来后 在新增多
# hobby = session.query(Hobby).filter(Hobby.caption == '篮球').first()
# person = Person(name='lxj', hobby_id=hobby.id)
# session.add(person)

# 3,想要按对象的方式新增,就要写relationship
# 拿到hobby对象后,直接在hobby对象上新增person对象
# hobby = session.query(Hobby).filter(Hobby.caption == '篮球').first()
# person = Person(name='王五',hobby=hobby)  # 必须先用relationship 写了,才能hobby=对象
# session.add(person)
# 方式二
hobby=Hobby(caption='足球')
person=Person(name='李四',hobby=hobby)
session.add_all([hobby,person])

session.commit()
session.close()

基于对象跨表查询
# 基于对象跨表查询
#正向查询
person = session.query(Person).filter(Person.name == 'lxj').first()
print(person.hobby)      # 直接 . 对象
# 反向查询
hobby = session.query(Hobby).filter(Hobby.caption == '篮球').first()
print(hobby.pers)     # 直接 . relationship写的backref名字

多对多

只能手动创建第三张关系表

# 一对多关系
# 导入
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

# 生成基类
Base = declarative_base()


# 手动创建第三张关系表
class Boy2Girl(Base):
   __tablename__ = 'boy2girl'
   id = Column(Integer, primary_key=True, autoincrement=True)
   girl_id = Column(Integer, ForeignKey('girl.id'))
   boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
   __tablename__ = 'girl'
   id = Column(Integer, primary_key=True, autoincrement=True)  # 主键自增
   name = Column(String(32), unique=True, nullable=False)  # 唯一 不为空

   def __str__(self):
       return self.name

   def __repr__(self):
       return self.name


class Boy(Base):
   __tablename__ = 'boy'
   id = Column(Integer, primary_key=True, autoincrement=True)  # 主键自增
   name = Column(String(32), unique=True, nullable=False)  # 唯一 不为空
   
   #   secondary 查询时经过的中间表
   girls = relationship('Girl', secondary='boy2girl', backref='boys')

   def __str__(self):
       return self.name

   def __repr__(self):
       return self.name


# 同步到数据库
# 创建引擎对象
engine = create_engine(
   'mysql+pymysql://root:123@127.0.0.1:3306/luffy',  # 不会创建库 会创建表需要提前手动创建库
   max_overflow=0,  # 超过连接池大小外最多创建的连接  0就是0+5 加上链接池的数量
   pool_size=5,  # 连接池大小 # sqlalchemy自带的连接池
   pool_timeout=30,  # 池中没有线程最多等待的时间,超过时间报错
   pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) -1就是不回收
)

# 同步到数据库 把Base管理的表都同步创建到数据库中
Base.metadata.create_all(engine)  # 不能修改字段,需要借助于第三方

多对多关系新增
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models2 import Boy,Girl,Boy2Girl

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/luffy")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 新增
# 1.笨办法,先新增boy和gril 后再去建立关系
girl =Girl(name='牛晓梅')
boy = Boy(name='李晓健')
session.add_all([girl,boy])
session.add(Boy2Girl(girl_id=1,boy_id=1))

# 方式2 使用relationship
boy= Boy(name='周衍根')
boy.girls=[Girl(name='大幂幂'),Girl(name='大热巴')] # 会自动经过第三张关系表并保存关系
session.add(boy)


session.commit()
基于对象的跨表查询
# 正向查询 relationship写在哪个表 就是正向
boy = session.query(Boy).filter(Boy.id==2).first()
print(boy.girls)

# 反向
girl = session.query(Girl).filter(Girl.id==2).first()
# relationship的backref
print(girl.boys)

联表跨表查询

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
engine = create_engine(
    'mysql+pymysql://root:123@127.0.0.1:3306/luffy',
)

Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 联表查询语句

session.commit()
session.close()

# 导入表
from models1 import Person,Hobby
# 联表操作
# 联表后过滤
res = session.query(Person,Hobby).filter(Person.hobby_id==Hobby.id).all()

# 自己联表查 inner join 内连接
res = session.query(Person).join(Hobby).all()
# left join 左查询 isouter=True 代表左连接
res = session.query(Person).join(Hobby,isouter=True).all()
# right join通过 调换表的顺序实现
res = session.query(Hobby).join(Person,isouter=True).all()