SqlAlchemy - Flask集成SqlAlchemy

发布时间 2023-05-04 17:36:35作者: Duosg

一、sqlalchemy 简介

1 sqlalchemy

在Flask中没有orm【对象关系映射】框架,方便我们快速操作数据库。但是在Flask,fastapi中用sqlalchemy居多

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API【数据库接口规范】之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果

1.1 简介

SQLAlchemy是一个流行的Python ORM(Object-Relational Mapping)库,它允许Python开发人员使用Python对象来操作关系型数据库,而不需要直接编写SQL语句。

SQLAlchemy支持多种数据库,包括MySQL、PostgreSQL、SQLite和Oracle等。

1.2 使用

① 安装

pip3 install sqlalchemy

② 使用sqlalchemy 操作各种数据库
SQLAlchemy本身无法操作数据库,其必须使用pymsql等第三方插件

  • MySQL

    pymysql
    
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        
    
    
  • Oracle

cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
  • 更多
http://docs.sqlalchemy.org/en/latest/dialects/index.html

1.3 原生操作

# 1 导入
from sqlalchemy import create_engine

# 2 生成引擎对象
engine=create_engine(
    "mysql+pymysql://root:root123@127.0.0.1:3306/bbs001",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 3 使用引擎获取链接,操作数据库
conn = engine.raw_connection()
cursor = conn.cursor()
sql = 'select * from app01_blog;'
cursor.execute(sql)

print(cursor.fetchall())

1.4 创建、操作数据表

不能创建库,也不能修改字段【增加、删除字段】

# 第一步:导入
from sqlalchemy import create_engine
import datetime
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 第二步:执行declarative_base 得到一个类
Base = declarative_base()


# 第三步:继承生成的base类
class good_list(Base):
    # 第四步:写字段
    id = Column(Integer, primary_key=True)  # 生成一列,类型是Integer,主键
    name = Column(String(32), index=True, nullable=False)  # name列varchar32,索引,不可为空
    category = Column(String(32), unique=True)  # unique不可重复
    create_time = Column(DateTime, default=datetime.datetime.now)
    desc = Column(Text, nullable=True)

    # 第五步:写表名 如果不写以类名为表名
    __tablename__ = 'goods'  # 数据库表名称

    # 第六步:联合建立索引

    __table_args__ = (
        # 联合唯一
        UniqueConstraint('id', 'name', name='unix_id_name'),
        # 索引
        Index('index_cate_name', 'name', 'category'),
    )


class Book(Base):
    __tablename__ = 'books'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))


# 第七步:把表同步到数据库中
engine = create_engine(
    "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)

# 创建表
# Base.metadata.create_all(engine)

# 删除表
Base.metadata.drop_all(engine)

1.5 sqlalchemy快速插入数据

使用orm插入

1 使用engine对象
2 生成session对象
3 拿到session对象
4 增加数据
"插入数据:使用orm"
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Book


# 1 生成orm对象
engine = create_engine(
    "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 2 拿到一个session类,绑定engine
Session = sessionmaker(bind=engine)

# 3 拿到session对象,相当于连接对象(session会话)
session = Session()

# 4 增加数据
book = Book(name='咖啡')
session.add(book)

# 5 提交数据 ,关闭session对象
session.commit()
session.close()

2 scoped_session线程安全

2.1 基本使用

生成的session会话是全局的,在多线程中会出现数据错乱的问题,导致线程的安全问题。

想要生成线程是安全的,可以使用内部的local对象,取代当前线程的session,如果当前线程中有local那么就直接使用,没有则新创建一个

from sqlalchemy.orm import scoped_session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 1 生成session对象
engine = create_engine(
    "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 2 拿到session类,闯入engine对象
Session = sessionmaker(bind=engine)

# session = Session() 这样创建的session 会有线程不安全的问题

# 生成的session会话是全局的,在多线程中会出现数据错乱的问题,导致线程的安全问题
# 想要生成线程是安全的,可以使用内部的local对象,取代当前线程的session,
# 如果当前线程中有local那么就直接使用,没有则新创建一个

# 在sqlalchemy中使用了scoped_session的对象
session = scoped_session(Session)

2.2 类装饰器

scoped_session的对象,没有session中的方法,比如addcommit等等,但是可以使用加在类上的装饰器,给这个类增加属性或者方法

这样scoped_session的对象,就可以使用Session类中的方法

image-20230410183500441

def run():
    print('run方法')


def wrapper(my_class):
    def inner(*args,**kwargs):
        obj = my_class(*args,**kwargs)
        # my_class 类的对象增加 run方法
        obj.run = run
        return obj
    return inner


@wrapper
class Duck:
    pass

if __name__ == '__main__':
    obj = Duck()
    # 给类添加了装饰器后,可以给类的对象新增run方法
    obj.run() 

二、 基本增删改查和高级查询

1 基本增删改查

(1) 查

  • filter中写表达式
  • filter_by直接写等式

① 查询所有all 返回普通列表

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Good, Book

# 生成orm对象
engine = create_engine(
    "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 拿到一个session类,绑定engine
Session = sessionmaker(bind=engine)
session = Session()


"查询"
res = session.query(Book).all()
sql_sent = session.query(Book)  # 不加all() 返回执行的sql语句
print(res) # 返回的是普通列表中 套对象 [<models.Book object at 0x10419da00>]
# # 如果在表模型中重写了__repr__方法,则可以定制列表的返回格式 [咖啡, 黑咖啡]
print

② 查询某几个字段


# select name as xx,id from book
res = session.query(Book.name.label('xx'),Book.id).all()
print(res)
ret = session.query(Book.name.label('xx'),Book.id)
for i in ret:
    print(i)

③ filter和filter_by

filter:传的是表达式, filter_by 传的是参数

res = session.query(Book).filter(Book.name == '咖啡').all()
print(res)

ret = session.query(Book).filter(Book.name != '咖啡').all()
print(ret)

④ 取了all()之后,结果是一个普通列表list,但是list没有first方法

res = session.query(Book).all().first() 
print(res)
---------
AttributeError: 'list' object has no attribute 'first'

⑤ 查询所有,使用占位符

text()中使用字符串,使用 :value :name占位符和.params()传递参数

res = session.query(Book).filter(text('id<:value and name=:name')).params(value=10,name='咖啡').all()
print(res)

⑥ 自定义查询

from_statement 写纯原生sql

res=session.query(User).from_statement(text("SELECT * FROM users where email=:email")).params(email='3@qq.com').all()
# print(type(res[0]))  # 是book的对象,但是查的是User表   不要这样写
print(res[0].name)  #

(2) 增

  • add()增加单个对象

  • add_all()增加一个列表[对象1,对象2,...]

"增"
good1 = Good(name='拿铁',category='咖啡',create_time=datetime.datetime.now(),desc='咖啡1')
good2 = Good(name='可口可乐',category='饮料',create_time=datetime.datetime.now(),desc='饮料')
book = Book(name='三国志')
session.add_all([good1,good2,book])
session.commit()

(3) 删

filterfilter_by查询的结果 不要allfirst,直接点 .delete()即可

delete后返回的结果是影响的数据库中的行数

good = session.query(Good).filter(Good.id > 7).all()
res= session.query(Good).filter(Good.id > 7).delete()
session.commit() # 1
print(good)
print(res)

(4) 改

① 方式1:update方法

# 方式1 update
res = session.query(Good).filter_by(id=7).update({'name':'美式咖啡'})
session.commit()

② 方式2:使用对象修改

good = session.query(Good).filter_by(id=7).first()
good.name='拿铁咖啡'
session.add(good) # add 如果有主键,就是修改,如果没有主键就是新增
session.commit()

2 高级查询

① and条件连接

条件之间用 逗号 连接

res = session.query(Book).filter(Book.id < 3, Book.name == '咖啡').all()
print(res)

② 范围查询:between关键字

res = session.query(Book).filter(Book.id.between(1,9)).all()
print(res)

③ in条件连接

由于in在python中是关键字,所以sqlalchemyin_表示in

res = session.query(Good).filter(Good.id.in_([1,3,5])).all()
print(res) # [phone, computer]

④ ~条件连接

~表示非,除。。外

res = session.query(Good).filter(~Good.id.in_([1,3,5])).all()
print(res) # [sound]

⑤ and 、or条件

from sqlalchemy import and_,or_
# and
res = session.query(Good).filter(and_(Good.id<3,Good.name=='phone')).all()
print(res)  # [phone]

# or
res = session.query(Good).filter(or_(Good.id>1,Good.name=='phone')).all()
print(res)  # [phone, sound, computer]

⑥ 通配符 与like搭配

  • %:匹配任意长度的任意字符。
  • _:匹配任意单个字符。
  • []:匹配指定字符集中的任意一个字符。例如,[aeiou]表示匹配任意一个元音字母,[0-9]表示匹配任意一个数字。
  • [^]:匹配除指定字符集外的任意一个字符。例如,[^aeiou]表示匹配任意一个非元音字母。
res = session.query(Good).filter(Good.name.like('p%')).all()
print(res)

⑦ 分页

# 一页放3条数据,查第7页
# 条 t  页 s
# [t*s:t*s+t]
res = session.query(Good)[3 * 7:3 * 7 + 3]

⑧ 排序 order_by 分组后排序

res = session.query(Good).order_by(Good.id.desc()).all() # desc降序
print(res) # [computer, sound, phone]
ret = session.query(Good).order_by(Good.id.asc()).all() # asc升序
print(ret)


# 2 先按照一个条件排序后,再按第二个条件排序
# res = session.query(Good).order_by(Good.id.desc(),Good.create_time.asc()).all()
res = session.query(Good).order_by(Good.id.desc()).all()
print(res)  # [computer, sound, phone] 

⑨ 分组查询 group_by

五个聚合函数max min sum count计数 avg

res = session.query(Good,func.min(Good.id)).group_by(Good.desc)

⑩ 分组之后的过滤 having

res = session.query(Good,func.max(Good.id),func.min(Good.id)).group_by(Good.desc).having(func.max(Good.id))

3 使用原生sql语句

3.1 在sqlalchemy中使用原生sql

(1) 方式1:使用引擎对象连接数据库,生成游标对象执行sql语句

" 方式1"

# 1 导入
from sqlalchemy import create_engine
# 2 生成引擎对象
engine = create_engine(
    "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 3 使用引擎获取连接,操作数据库
conn = engine.raw_connection()
cursor = conn.cursor()

# 4 编写sql语句
sql_sent = 'select * from goods'
cursor.execute(sql_sent)

print(cursor.fetchall())

(2) 方式2:通过session对象来执行sql语句

" 方式1"

# # 1 导入
# from sqlalchemy import create_engine
# # 2 生成引擎对象
# engine = create_engine(
#     "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test",
#     max_overflow=0,  # 超过连接池大小外最多创建的连接
#     pool_size=5,  # 连接池大小
#     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
#     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
# )
# # 3 使用引擎获取连接,操作数据库
# conn = engine.raw_connection()
# cursor = conn.cursor()
#
# # 4 编写sql语句
# sql_sent = 'select * from goods'
# cursor.execute(sql_sent)
#
# print(cursor.fetchall())

"方式2"
from models import Good, Book
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session

# 1 生成引擎engine对象
engine = create_engine(
    "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test"
)
# 2 生成session对象
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 3 通过session的execute方法来执行sql语句,并提交
cursor = session.execute(text('insert into books(name) value (:book_name)'),params={'book_name':'围城'})
session.commit()
print(cursor.lastrowid) # 返回最后一行数据的id
session.close()

3.2 在django中使用原生sql

``表名.objects.raw(sql)`执行sql语句,但是如果查询的不是模型表中的字段,而是其他表中的字段也可以查找到不会报错,但是会乱套不推荐使用


def index(request):
    res = Book.objects.raw('select * from app01_publish where id=1')  # RawQuerySet  用起来跟列表一样
    print(res[0])
    print(type(res[0]))
    print(res[0].name)
    # book 没有addr,但是也打印出来了
    print(res[0].addr)

    return HttpResponse('ok')

4 表关系:一对多

4.1 表模型的创建

relationship跟数据库无关,不会新增字段,只用于快速连表操作,backref用于反向查询

# 一对多关系
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名
    # 关联字段写在多的一方,写在Person中,跟hobby表中id字段做外键关联
    hobby_id = Column(Integer, ForeignKey("hobby.id"))

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 基于对象的跨表查询:就要加这个字段,取对象  person.hobby     pserson.hobby_id
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')  # 如果有hobby对象,拿到所有人 hobby.pers

    def __repr__(self):
        return self.name


engine = create_engine("mysql+pymysql://root:root123@127.0.0.1:3306/flask_test", )

# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)

# 把所有表删除
# Base.metadata.drop_all(engine)

4.2 新增

(1)新增方式1:通过表模型来新增

① 不带外键

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models2 import Hobby, Person

# 1 生成引擎对象
engine = create_engine("mysql+pymysql://root:root123@127.0.0.1:3306/flask_test", )

# 2 生成session对象
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

"一对多新增:通过模型对象来新增,但是外键之间不会关联"
#  1 生成模型对象
hobby = Hobby(caption='购物')
# 2 session.add新增
session.add(hobby)
person = Person(name='石购物')
session.add(person)

# 3 提交
session.commit()
image-20230410215007295

② 带外键

"新增:带外键"
hobby = session.query(Hobby).filter(Hobby.caption=='购物').first()

person = Person(name='孙燕姿',hobby_id=hobby.id)
session.add(person)

session.commit()

(2)新增方式2:按对象的方式新增

支持按对象的增加方式,必须加relationship 做关联

① 新增一个对象

新增的对象有与之通过relationship字段关联的表后,就可以直接传入对象来新增

hobby = session.query(Hobby).filter(Hobby.caption == '听歌').first()
person = Person(name='王力宏', hobby=hobby)
session.add(person)
session.commit()

② 新增多个对象

当多个对象都没有的时候,可以用add_all()方法来新增列表中的对象

hobby = Hobby(caption='吃饭')
person = Person(name='胡吃饭', hobby=hobby)
session.add_all([hobby, person])
session.commit()

4.3 基于对象的查询

(1) 正向查询:查询有relationship字段的表

person = session.query(Person).filter(Person.name =='王听歌').first()
print(person) # 王听歌
print(person.hobby) # 听歌
print(person.hobby.id) # 1
print(person.hobby_id) # 1

(2) 反向查询:查询没有relationship字段的表,按照backref属性查询

由于是一对多关系,所以反向查询时,返回的是列表

hobby = session.query(Hobby).filter(Hobby.id==4).first()
# 反向查询时,点 的是relationship字段的 backref=pers 属性
print(hobby) # 购物
print(hobby.pers) # [石购物, 孙燕姿] 由于是一对多反向查询时,返回的是列表
for person in hobby.pers:

    print(person.name)

5 表关系:多对多

5.1 表模型

多对多的表模型,表与表之间的关系的第三章表,需要我们手动创建

外键字段放在那一张表中都可以

# 一对多关系
from sqlalchemy import create_engine

from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


# 多对多

# 中间表  手动创建
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)



    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    # 方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


engine = create_engine("mysql+pymysql://root:root123@127.0.0.1:3306/flask_test", )



# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)

# 把所有表删除
# Base.metadata.drop_all(engine)

5.2 新增

(1) 新增方式1:通过表模型来新增

girl = Girl(name='丽桑卓')
boy = Boy(name='马尔扎哈')

# 先生成模型对象,新增模型对象后在创建他们之间的关系
session.add_all([girl, boy])
session.commit()

# 在关系表中创建关系
session.add(Boy2Girl(girl_id=1, boy_id=1))
session.commit()

(2)新增方式2:按对象的方式新增

使用relationship来创建对象,这样新增的对象,在两张表中都会新增数据,并且会在第3张关系表中自动新增好彼此的关系

boy = Boy(name='派克')
boy.girls = [Girl(name='阿狸'), Girl(name='厄运')]
session.add(boy)
session.commit()

5.3 基于对象的跨表查询

由于关系是多对多,所以查询出来的结果都是列表

(1) 正向查询:查询有relationship字段的表

boy = session.query(Boy).filter(Boy.id == 2).first()
print(boy.girls)  # [阿狸, 厄运]

(2) 反向查询:查询没有relationship字段的表,按照backref属性查询

girl = session.query(Girl).filter(Girl.id == 2).first()
# 使用relationship字段中的 backref属性
print(girl.boys)  # [派克]

6 连表查询

(1) 指定字段连接

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models2 import Person,Hobby
# 1 生成引擎对象
engine = create_engine("mysql+pymysql://root:root123@127.0.0.1:3306/flask_test", )

# 2 生成session对象
Session = sessionmaker(bind=engine)
session = scoped_session(Session)



"指定字段连接"
res = session.query(Person,Hobby).filter(Person.hobby_id==Hobby.id).all()
print(res)
# [(王听歌, 听歌), (李打游戏, 打游戏), (张看电影, 看电影), (赵听歌, 听歌), (石购物, 购物), (孙燕姿, 购物), (王力宏, 听歌), (胡吃饭, 吃饭)]

三、 Flask集成sqlalchemy

1 flask-sqlalchemy:操作数据库

1.1 简介

flask中一般使用flask-sqlalchemy来操作数据库,使用起来比较简单,易于操作,操作数据库需要先创建一个db对象

1.2 快速使用

(1)安装

pip install flask-sqlalchemy

(2)Flask中快速使用

① 导入flask-sqlalchemy

from flask_sqlalchemy import SQLAlchemy

② 实例化得到db对象

db = SQLAlchemy()

③ 将db注册到app中

db.init_app(app)

④ 视图函数中使用session

此时的session已经是线程安全的了,不会出现多线程中数据混乱的问题了

from session_sql import session

@app.route('/')
def index():
    session.add(Book(name='金萍媒'))
    session.commit()
    session.close()
    return '增加成功'

⑤ models.py 中继承db.Model

字段使用db对象中自带的db.Columndb.Integerdb.ForeignKey

# 导入db对象
from manage import db

# 多对多

# 中间表  手动创建
class Boy2Girl(db.Model):
    __tablename__ = 'boy2girl'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    girl_id = db.Column(db.Integer, db.ForeignKey('girl.id'))
    boy_id = db.Column(db.Integer, db.ForeignKey('boy.id'))


class Girl(db.Model):
    __tablename__ = 'girl'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True, nullable=False)

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Boy(db.Model):
    __tablename__ = 'boy'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String(64), unique=True, nullable=False)
    girls = db.relationship('Girl', secondary='boy2girl', backref='boys')

⑥ 在Flask项目的配置文件中配置好

  • manage.py中
app.config.from_pyfile('settings.py')
  • settings.py中
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:root123@127.0.0.1:3306/flask_test?charset=utf8"
SQLALCHEMY_POOL_SIZE = 5
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1
# 追踪对象的修改并且发送信号
SQLALCHEMY_TRACK_MODIFICATIONS = False


(3)配置项说明

配置选项 说明
SQLALCHEMY_DATABASE_URI 连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8
SQLALCHEMY_BINDS 一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
SQLALCHEMY_ECHO 调试设置为true
SQLALCHEMY_POOL_SIZE 数据库池的大小,默认值为5。
SQLALCHEMY_POOL_TIMEOUT 连接超时时间
SQLALCHEMY_POOL_RECYCLE 自动回收连接的秒数。
SQLALCHEMY_MAX_OVERFLOW 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS 如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。

2 flask-migrate库:表的映射

创建好表后,修改表的字段,需要映射到数据库,原生的sqlalchemy,不支持修改表的字段,需要用到flask-migrate

2.1 快速使用

提前需要安装flask:2.2.2 , flask-script:2.0.3

(1)安装flask-migrate

pip3 install flask-migrate==2.7.0

(2)注册flask-migrate和修改启动文件

① 在app所在文件中导入

from flask_script import Manager, Server
from flask_migrate import Migrate, MigrateCommand

② 生成manager对象,管理app和数据库

manager = Manager(app)
Migrate(app, db)
manager.add_command('db', MigrateCommand) # 创建数据库映射命令
# manager.add_command('start',Server(port=6666,use_debugger=True)) # 创建启动命令

③ 修改app.run()manager.run()

以后使用python manage.py runserver 启动项目

python3 manage.py runserver 

④ flask-migrate中的命令

python manage.py db init  # 生成一个migrations文件夹,里面以后不要动,记录迁移的编号

执行了 init命令后,会出现一个migrations文件夹,其中包括了数据库迁移的记录

  • 以后在models.py 写表,加字段,删字段,改参数
# 以后在models.py 写表,加字段,删字段,改参数
> python manage.py db migrate # 记录
> python manage.py db upgrade # 与数据同步

3 flask项目演示

# 0 创建数据库 movie
# 1 pycharm打开项目
# 3 在models中,注释,解开注释,右键执行,迁移表
# 4 在models中恢复成原来的
# 5 在命令行中python manage.py runserver运行项目
# 6 访问前台和后台