flask:sqlalchemy快速插入数据、基于scoped_session实现线程安全、基本增删查改、一对多、多对多、连表查询

发布时间 2023-04-11 15:01:44作者: wwwxxx123

一、sqlalchemy快速插入数据

# 使用orm插入
from models import Book, User
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 第一步:生成engine对象
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/flask",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第二步:创建一个Session类,传入engine
Session = sessionmaker(bind=engine)

# 第三步:创建session对象,相当于连接对象(会话)
session = Session()
# 这里的session是约定俗称的,不是cookie、session的那个session
# 第四步,增加数据
book = Book(name='红楼梦')
session.add(book)

# 提交事务
session.commit()
# 关闭session
session.close()

运行代码后,可以看到成功创建了一条记录

img

二、基于scoped_session实现线程安全

2.1 基本使用

from sqlalchemy.orm import scoped_session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 第一步:生成engine对象
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/flask",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:拿到一个Session类,传入engine
Session = sessionmaker(bind=engine)

# 第三步:创建session对象,相当于连接对象(会话)
session = Session()

这里的步骤三我们放到视图中去的时候,如果启动了多线程执行,有以下两个情况:

1、我们将步骤三创建session的代码放到视图方法中去,这会导致每个线程都跟数据库创建了一个连接会话,消耗资源过多

2、我们将session创建在视图方法外面,这样全局都用的是一个session,这也意味这只使用一个连接会话去执行这些操作,这会导致数据混乱

上面的两种情况,我们发现多线程运行的时候都不安全(出现并发安全问题)

知道了问题,我们就想:怎么把session对象做成线程安全的?

结论就是使用scoped_session模块

使用scoped_session实现线程安全的代码

# from flask import Flask
from models import User

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 第一步:生成engine对象
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/aaa",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:拿到一个Session类,传入engine
Session = sessionmaker(bind=engine)


# 因为全局都用一个session对象,会有并发安全问题,于是sqlalchemy就使用scoped_session解决了这个问题

from threading import Thread
from sqlalchemy.orm import scoped_session


# session 是  scoped_session 的对象
session = scoped_session(Session)
# 内部使用了local对象,取当前线程的session,如果当前线程有,就直接返回用,如果没有,创建一个,放到local中

# 这时候的session对象,不是Session的对象,但是它有Session的所有方法
def task():
    user = User(name='lqz', email='3@qq.com', extra='asdfasfd')

    session.add(user)
    session.commit()
    session.close()
    return '增加成功'


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task)
        t.start()

2.2 scoped_session源码及加在类上的装饰器

scoped_session部分源码

class scoped_session(Generic[_S]):

    _support_async: bool = False

    session_factory: sessionmaker[_S]

    registry: ScopedRegistry[_S]

    def __init__(
        self,
        session_factory: sessionmaker[_S],
        scopefunc: Optional[Callable[[], Any]] = None,
    ):

        self.session_factory = session_factory

        if scopefunc:
            self.registry = ScopedRegistry(session_factory, scopefunc)
        else:
            self.registry = ThreadLocalRegistry(session_factory)

通过他的源码,我们发现我们传进去的参数并没有被直接使用,同时双下init方法中创建的属性在经过if判断的时候也肯定是空的,因为我们传进来的参数没被这两个属性使用、绑定

因此if语句肯定会走else分支:

self.registry = ThreadLocalRegistry(session_factory)

于是我们接着去看ThreadLocalRegistry的源码

class ThreadLocalRegistry(ScopedRegistry[_T]):

    def __init__(self, createfunc: Callable[[], _T]):
        self.createfunc = createfunc
        self.registry = threading.local()

在这里我们看到registry属性的值是threading.local(),回顾之前学的loacl对象,我们可以得知他也是用local解决并发安全问题的

因此我们得出结论:现在的session对象,不是Session的对象,但是它有Session的所有方法

于是下一个问题就出现了:我们没有在上面的源码中找到scoped_session是怎么把原本Session中的方法拿过去的

这时候我们看看scoped_session源码的上面,我们可以看到一个庞大的装饰器,他是一个加在类上的装饰器

@create_proxy_methods(
    Session,
    ":class:`_orm.Session`",
    ":class:`_orm.scoping.scoped_session`",
    classmethods=["close_all", "object_session", "identity_key"],
    methods=[
        "__contains__",
        "__iter__",
        "add",
        "add_all",
        "begin",
        "begin_nested",
        "close",
        "commit",
        "connection",
        "delete",
        "execute",
        "expire",
        "expire_all",
        "expunge",
        "expunge_all",
        "flush",
        "get",
        "get_bind",
        "is_modified",
        "bulk_save_objects",
        "bulk_insert_mappings",
        "bulk_update_mappings",
        "merge",
        "query",
        "refresh",
        "rollback",
        "scalar",
        "scalars",
    ],
    attributes=[
        "bind",
        "dirty",
        "deleted",
        "new",
        "identity_map",
        "is_active",
        "autoflush",
        "no_autoflush",
        "info",
    ],
)
class scoped_session(Generic[_S]):

加在类上的装饰器

上面scoped_session源码上的装饰器很复杂,因此我们写一个简单的装饰器进行分析

def speak():
    print('说话了')


def wrapper(func):
    def inner(*args, **kwargs):
        res = func()
        res.name = 'lqz'
        res.speak = speak
        return res

    return inner


@wrapper  # 语法糖会把Person当参数传入到装饰器中   Person=wrapper(Person)
class Person:
    pass


p = Person()

print(p.name)
p.speak()

这里我们可以看到,经过装饰器后,原本的类还是原本的类,但是被我们添加了别的东西

因此我们可以得出下列结果

scoped_session = create_proxy_methods(scoped_session,...)

因此这时候创建的session对象就是scoped_session创建的对象

create_proxy_methods源码

def create_proxy_methods(
    target_cls: Type[Any],
    target_cls_sphinx_name: str,
    proxy_cls_sphinx_name: str,
    classmethods: Sequence[str] = (),
    methods: Sequence[str] = (),
    attributes: Sequence[str] = (),
) -> Callable[[_T], _T]:
    def decorate(cls):
        return cls

    return decorate

而通过他的源码以及内部的注释(注释太多没贴),我们可以得知,他相当于返回了一个类,具有原本的Session的所有方法(相当于我们创建了一个scoped_session的对象,但是给他添加了那些Session中的方法)

三、基本增删查改

在执行下方的查询操作时,我们会发现他打印的不是表中的值,而是一个个对象,这时候就需要我们去模型层中配置表的双下str和双下repr

    def __str__(self):  # 打印对象时,触发
        return self.name

    def __repr__(self):  # 如果对象在容器中,显示的中文
        return self.name
# 增,删,改
# 查 基本查询和高级查询


from models import User, Book
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/flask")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 1  增加:add   add_all
'add是添加单条数据,而add_all可以添加多个,多个对象可以是models中任意表模型的对象'
# user = User(name='pyy', email='44@qq.com', extra='摄氏度法')
# user1 = User(name='yyy', email='4@qq.com', extra='大沙发斯蒂芬')
# book = Book(name='西游记')
# # session.add(user)
# session.add_all([user, user1, book])  # 多个对象可以是models中任意表模型的对象
#
# session.commit()
# session.close()


# 2 基本查  filter  filter_by

# filer:写条件,条件中的等号要写成双等号
# filter_by:写等于的值,等号不用写成双等号

# filter
# 2.1 session.query(User)   中写表模型,可以写多个表模型(连表操作)  select * from User;
# 2.2 filter 过滤条件,必须写表达式  ==    >=    <=    != 等等  例:select * from user where user.id=1
# 2.3 all:普通列表  first
# user = session.query(User).filter(User.name == 'lqz').first()
# user = session.query(User).filter(User.name != 'lqz').all()
# print(user)
# res = session.query(User).filter(User.id > 1).all()
# print(res)

# filter_by  直接写等式    不能写成 User.name = 'lqz'
# user = session.query(User).filter_by(name='lqz').first()
# user = session.query(User).filter_by(id=2).first()
# user = session.query(User).filter_by(id=2).first()
# print(user)

# 3 删除(查到才能删) filter或filter_by查询的结果  不要all或first出来, .delete()即可
# res = session.query(User).filter_by(id=2).delete()
# session.commit()  # 一定不要忘了
# print(res) # 打印的是影响的行数


# 4 修改(查到才能改)
# 方式一:update修改
# res = session.query(User).filter_by(id=3).update({"name" : "彭于晏"})
# print(res)
# session.commit()
# 方式二,使用对象修改
# res = session.query(User).filter_by(id=3).first()
# res = session.query(User).filter_by(name='zzz').first()
# res.name='来来来'
# print(res.id)
# session.add(res)  # add 如果有主键,就是修改,如果没有主键就是新增
# session.commit()

3.1 基本增删查改和高级查询

# 4 查询: filer:写条件     filter_by:等于的值
# 4.1 查询所有  是list对象
# res = session.query(User).all()  # 是个普通列表
# print(type(res))
# print(len(res))

# 4.1.1 只查询某几个字段
# select name as xx,email from user;
# res = session.query(User.name.label('xx'), User.email)
# print(res)  # 打出原生sql
# # print(res.all())
# for item in res.all():
#     print(item[0])


# 4.1.2 filter传的是表达式,filter_by传的是参数
# res = session.query(User).filter(User.name == "lqz").all()
# res = session.query(User).filter(User.name != "lqz").all()
# res = session.query(User).filter(User.name != "lqz", User.email == '3@qq.com').all()  # django 中使用 Q
# res = session.query(User).filter_by(name='lqz099').all()
# res = session.query(User).filter_by(name='lqz099',email='47@qq.com').all()
# print(len(res))

# 4.2 取一个 all了后是list,list 没有first方法
# res = session.query(User).first()



# 4.3 查询所有,使用占位符(了解)  :value     :name
# select * from user where id <20 or name=lqz099
# res = session.query(User).filter(text("id<:value or name=:name")).params(value=10, name='lqz099').all()


# 4.4 自定义查询(了解)
# from_statement 写纯原生sql

# res=session.query(User).from_statement(text("SELECT * FROM users where email=:email")).params(email='3@qq.com').all()
# # print(type(res[0]))  # 是book的对象,但是查的是User表   不要这样写
# print(res[0].name)  #

# 4.5 高级查询
#  条件
# 表达式,and条件连接
# res = session.query(User).filter(User.id > 1, User.name == 'lqz099').all() # 默认情况就是and条件


# between
# res = session.query(User).filter(User.id.between(1, 9), User.name == 'lqz099').all()
# res = session.query(User).filter(User.id.between(1, 9)).all()

# in
# res = session.query(User).filter(User.id.in_([1,3,4])).all()
# res = session.query(User).filter(User.email.in_(['3@qq.com','r@qq.com'])).all()

# ~非,除。。外
# res = session.query(User).filter(~User.id.in_([1,3,4])).all()
# print(res)

# 二次筛选
# res = session.query(User).filter(~User.id.in_(session.query(User.id).filter_by(name='lqz099'))).all()
# 不能点all之后在接filter
# print(res)


# and or条件
from sqlalchemy import and_, or_

# or_包裹的都是or条件,and_包裹的都是and条件
# res = session.query(User).filter(and_(User.id >= 3, User.name == 'lqz099')).all()  #  and条件
# res = session.query(User).filter(User.id < 3, User.name == 'lqz099').all()  #  等同于上面
# res = session.query(User).filter(or_(User.id < 2, User.name == 'eric')).all()
# res = session.query(User).filter(
#     or_(
#         User.id < 2,
#         and_(User.name == 'lqz099', User.id > 3),
#         User.extra != ""
#     )).all()


# 通配符,以e开头,不以e开头
# res = session.query(User).filter(User.email.like('%@%')).all()
# select user.id from user where  user.name not like e%;
# res = session.query(User.id).filter(~User.name.like('e%'))


# 分页
# 一页2条,查第5页
# res = session.query(User)[2*5:2*5+2]

# 排序,根据name降序排列(从大到小)
# res = session.query(User).order_by(User.email.desc()).all()
# res = session.query(Book).order_by(Book.price.desc()).all()
# res = session.query(Book).order_by(Book.price.asc()).all()
# 第一个条件重复后,再按第二个条件升序排
# res = session.query(User).order_by(User.name.desc(), User.id.asc())



# 分组查询  5个聚合函数
from sqlalchemy.sql import func
# 聚合函数:Max最大、Min最小、Sum总和、Avg平均、count统计
# res = session.query(User).group_by(User.extra)  # 如果是严格模式,就报错
# 分组之后取最大id,id之和,最小id  和分组的字段
# res = session.query(
#     User.extra,
#     func.max(User.id),
#     func.sum(User.id),
#     func.min(User.id)).group_by(User.extra).all()
# for item in res:
#     print(item[2])

# having
# select max(id),sum(id),min(id) from  user group by  user.extra   having id_max>2;
res = session.query(
    func.max(User.id),
    func.sum(User.id),
    func.min(User.id)).group_by(User.extra).having(func.max(User.id) > 2)

3.2 原生sql

### 方式一:
# 第一步:导入
from sqlalchemy import create_engine
# 第二步:生成引擎对象
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/cnblogs",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第三步:使用引擎获取连接,操作数据库
conn = engine.raw_connection()
cursor=conn.cursor()
cursor.execute('select * from aritcle')
print(cursor.fetchall())



### 方式二:
from models import User, Book
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 2.0.9 版本需要使用text包裹一下,原来版本不需要
# cursor = session.execute(text('select * from users'))
# result = cursor.fetchall()
# print(result)

cursor = session.execute(text('insert into books(name) values(:name)'), params={"name": '红楼梦'})
session.commit()
print(cursor.lastrowid)

session.close()

3.3 django中执行原生sql

参考博客:https://www.cnblogs.com/zhihuanzzh/p/16985734.html

# 选择的查询基表Book.objects.raw ,只是一个傀儡,正常查询出哪些字段,都能打印出来

def index(request):
    # books = Book.objects.raw('select * from app01_book where id=1')  # RawQuerySet  用起来跟列表一样
    # books = Publish.objects.raw('select * from app01_book where id=1')  # RawQuerySet  用起来跟列表一样
    # print(books[0])
    # print(type(books[0]))
    # # for book in books:
    # #     print(book.name)
    # # print(books[0].name)
    # print(books[0].addr)  #也能拿出来,但是是不合理的

    res = Book.objects.raw('select * from app01_publish where id=1')  # RawQuerySet  用起来跟列表一样
    print(res[0])
    print(type(res[0]))
    print(res[0].name)
    # book 没有addr,但是也打印出来了
    print(res[0].addr)

    return HttpResponse('ok')

四、一对多

# 一对一:本身是一个表,拆成两个表,做一对一的关联;;;本质就是一对多,只不过关联字段唯一
# 一对多:关联字段写在多的一方
# 多对多:需要建立中间表;;本质也是一对多

# 本质就只有一种外键关系

4.1 表模型

# 一对多关系
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名
    # 关联字段写在多的一方,写在Person中,跟hobby表中id字段做外键关联
    hobby_id = Column(Integer, ForeignKey("hobby.id"))

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 基于对象的跨表查询:就要加这个字段,取对象  person.hobby     pserson.hobby_id
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')  # 如果有hobby对象,拿到所有人 hobby.pers

    def __repr__(self):
        return self.name


engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa", )

# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)

# 把所有表删除
# Base.metadata.drop_all(engine)

4.2 新增和基于对象的查询

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models1 import Hobby, Person

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 一对多新增

# hobby = Hobby(caption='乒乓球')
# session.add(hobby)
# person = Person(name='张三')
# session.add(person)

# hobby=session.query(Hobby).filter(Hobby.caption=='乒乓球').first()
# # person = Person(name='王五',hobby_id=hobby.id)
# person = Person(name='王五',hobby_id=1)
# session.add(person)


# 支持按对象的增加方式,必须加relationship 做关联
# 方式一
# hobby=session.query(Hobby).filter(Hobby.caption=='乒乓球').first()
# person = Person(name='赵六',hobby=hobby)
# 方式二
# hobby = Hobby(caption='羽毛球')  # 表中暂时没有
# person = Person(name='赵六', hobby=hobby)
# session.add_all([person, hobby])
# session.commit()




## 基于对象的跨表查询  .
# 正向查询
# person=session.query(Person).filter(Person.name=='王五').first()
# # print(person.hobby_id)
# print(person.hobby)  # Hobby 的对象

# 反向查询
# hobby=session.query(Hobby).filter(Hobby.id==1).first()
# print(hobby.pers)



# 基于连表的查询(一会讲)

五、多对多

5.1 表模型

# 一对多关系
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


# 多对多

# 中间表  手动创建
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)



    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    # 方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa", )


# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)

# 把所有表删除
# Base.metadata.drop_all(engine)

5.2 增加和基于对象的跨表查询

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models2 import Girl, Boy, Boy2Girl

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 新增
# 1 笨办法新增
# girl=Girl(name='刘亦菲')
# boy=Boy(name='彭于晏')
# session.add_all([girl,boy])
# session.add(Boy2Girl(girl_id=1,boy_id=1))
# session.commit()

# 2 使用relationship
# boy = Boy(name='lqz')
# boy.girls = [Girl(name='迪丽热巴'), Girl(name='景田')]
# session.add(boy)
# session.commit()


# 基于对象的跨表查询
# 正向
# boy = session.query(Boy).filter(Boy.id==2).first()
# print(boy.girls)

# 反向
# girl = session.query(Girl).filter(Girl.id==2).first()
# print(girl.boys)


# 如果没有relationship,纯自己操作



# 基于连表的查询(一会讲)

六、连表查询

### 关联关系,基于连表的跨表查询
from models1 import Person,Hobby
# 链表操作
# select * from person,hobby where person.hobby_id=hobby.id;
# res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id).all()

# 自己连表查询
# join表,默认是inner join,自动按外键关联
# select * from Person inner join Hobby on Person.hobby_id=Hobby.id;
# res = session.query(Person).join(Hobby).all()

#isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
# select * from Person left join Hobby on Person.hobby_id=Hobby.id;
# res = session.query(Person).join(Hobby, isouter=True).all()
# 没有right join,通过这个实现
# res = session.query(Hobby).join(Person, isouter=True).all()

# # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from Person left join Hobby on Person.id=Hobby.id;
# res = session.query(Person).join(Hobby, Person.hobby_id == Hobby.id, isouter=True) #  sql本身有问题,只是给你讲, 自己指定链接字段
# 右链接
# print(res)



# 多对多关系连表
# 多对多关系,基于链表的跨表查
#方式一:直接连
res = session.query(Boy, Girl,Boy2Girl).filter(Boy.id == Boy2Girl.boy_id,Girl.id == Boy2Girl.girl_id).all()
# 方式二:join连
res = session.query(Boy).join(Boy2Girl).join(Girl).filter(Person.id>=2).all()