Flask2.0基础教程

发布时间 2023-10-09 21:19:36作者: 韩志超

Flask基础

image

Flask介绍

参考:Flask官方文档

Flask 是一个用 Python 编写的轻量级 Web 应用框架。它的核心非常简单,但是可以通过各种插件来扩展,使其可以用来构建复杂的 Web 应用。Flask 的设计目标是保持核心简单且易于使用,同时能够被扩展以适应不同的应用需求。

Flask框架主要特点:

  • 轻量级:Flask 本身只提供了最基础的 Web 功能,如 URL 路由、请求和响应处理等。这使得 Flask 非常轻量,易于学习和使用。
  • 易于扩展:虽然 Flask 本身功能有限,但是它可以通过各种插件来扩展,如数据库操作(Flask-SQLAlchemy)、表单处理(Flask-WTF)、用户认证(Flask-Login)等。
  • Jinja2 模板引擎:Flask 使用 Jinja2 作为其模板引擎,可以方便地生成动态 HTML 内容。

常用插件

Flask 常用插件

  • 数据库操作:Flask-SQLAlchemy           pip install flask-sqlalchemy
  • 数据库迁移:Flask-Migrate                   pip install flask-migrate
  • 表单处理:Flask-WTF                           pip install flask-wtf
  • 后台管理:Flask-Admin                        pip install flask-admin
  • 用户认证:Flask-Login                         pip install flask-login
  • Token认证: Flask-JWT-Extended         pip install flask-jwt-extended
  • 接口频率限制:Flask-Limiter               pip install flask-limiter
  • 邮件发送:Flask-Mail                           pip install flask-mail
  • 密码生成:Flask-Bcypt                         pip install flask-bcypt
  • 缓存:FLask-Caching                           pip intall flask-caching
  • 页面调试:Flask-DebugToobar             pip install flask-debugtoolbar
  • 静态文静缓存:Flask-Static-Digest        pip install flask-static-digest

快速上手

  1. 安装Flask:
$ pip install flask

新建app.py

from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello_world():
    return "<p>Hello, World!</p>"
  1. 命令行运行
$ flask run

常用命令

Flask常用命令

  • flask run: 启动开发服务器,常用参数
    --app: 指定模块及应用名(默认模块为app.py,应用为app或application),例如:flask --app hello:myapp run
    --debug: 启用调试模式
    --reload: 修改代码后自动重启服务
    --env-file: 指定.env环境配置
    --host/-h:指定主机地址,例如 --host 0.0.0.0
    --port/-p:指定端口,例如 --port 5001
  • flask shell: ½øÈëflask应用环境(可以执行数据库操作)

Flask Migrate插件常用命令

  • flask db init:初始化迁移目录
  • flask db migrate: 生成数据库迁移
  • flask db upgrade: 执行数据迁移

Flask路由配置

URL重定向

  • @app.route(‘/login/‘) 支持 .../login/ 和 .../login 访问 (自动重定向到/login/)
  • @app.route(‘/login‘) 只支持 ../login 访问

请求方法限制

  • @app.route('/login', methods=['GET', 'POST’])
    只接受单个请求方法也可以使用 @app.post(‘/login’) 或 @app.get(‘/login’)

路径参数 @app.route(‘/user/string:username’) , 参数类型

参数类型 说明
string (缺省值) 接受任何不包含斜杠的文本
int 接受正整数
float 接受正浮点数
path 类似 string ,但可以包含斜杠
uuid 接受 UUID 字符串

通过接口函数名反查URL

from flask import url_for
print(url_for(login’))

静态文件URL: url_for('static', filename='style.css')

Flask请求参数获取

  • URL路径参数获取:在接口函数中添加URL对应的参数来获取
  • Query参数获取:·name = request.args.get(‘name’)
  • POST表单请求数据:request.form.get(‘name’)
  • JSON格式数据获取:request.json.get(‘name’)
  • 获取上传文件:file = request.files.get(‘file_name’); file.save(f‘/uploads/{file.filename}’
  • 获取Cookie参数:request.cookies.get(‘token’)

会话

参考:quickstart

除了请求对象之外还有一种称为 session 的对象,允许我们在不同请求之间储存信息。这个对象相当于用密钥签名加密的 cookie ,即用户可以查看您的 cookie ,但是如果没有密钥就无法修改它。

  • 使用方法:from flask import session
  • 添加会话变量:sesstion[‘username’] = ...
  • 删除会话变量:session.pop(‘username’)

Flask返回响应

返回文本或HTML

return ''<h2>Hello</h2>''

渲染并返回页面模板

return render_template(‘user.html’, id=1)

返回JSON数据,直接返回一个字典/列表或使用jsonify()

return {‘code’:0, ‘msg’: ‘success’}
return jsonify({‘code’:0, ‘msg’: ‘success’})

指定状态码和响应头

return render_template(‘error.html’), 404, {‘test’: ‘abc’}

指定状态码和响应头

return render_template(‘error.html’), 404, {‘test’: ‘abc’}
# 或者 return make_response(render_template(‘error.html’), 404, {‘test’: ‘abc’})

返回文件流或文件下载

from flask import send_file; send_file(‘/path/a.png’, ‘image/png’, as_attachment=True)
from flask import send_file_from_directory(‘/path’, ‘a.png’, as_attachement=True)

返回提示消息

  • flash(): 发送提示消息 模板中使用 get_flashed_message() 来获取消息
    返回重定向
return redirect(‘/400’)

Flask返回响应-异常处理

抛出异常

abort(404)

注册异常处理函数

@app.errorhandler(404)
def error_404(error):
    return render_template(‘404.html'), 404


@app.errorhandler(Exception)
def error_unknown(error):
    return render_template(‘500.html’), 500

JinJa2模板引擎

Jinja2介绍

参考:Jinja2官方文档

Jinja2 是一个用于 Python 的强大的模板引擎,它被广泛用于各种 Web 开发框架中,包括 Flask。Jinja2 提供了一种简单的方式来动态地生成 HTML 或其他标记语言。

Jinja2主要特性:

  • 变量替换:你可以在模板中使用双大括号 {{ }} 来插入变量,例如 {{ name }}。当模板被渲染时,这些变量将被实际的值替换。
  • 控制结构:Jinja2 支持多种控制结构,包括条件语句({% if %}、{% else %}、{% elif %})和循环语句({% for %})。这使得你可以在模板中进行复杂的逻辑处理。
  • 模板继承:Jinja2 支持模板继承,这意味着你可以定义一个基础模板(包含一些通用的元素,如页头、页脚等),然后创建多个继承自这个基础模板的子模板。这可以避免重复代码,使得模板更易于管理。
  • 过滤器:·Jinja2 提供了一系列的过滤器,可以用来修改变量的值。例如,{{ name|lower }} 将把 name 变量的值转换为小写。你也可以定义自己的过滤器。
  • 自动转义:为了防止跨站脚本攻击(XSS),Jinja2 默认会自动转义所有的变量。这意味着如果变量的值包含 HTML 代码,这些代码将被转义为对应的 HTML 实体,而不会被浏览器解析执行。
  • 宏:Jinja2 的宏类似于 Python 中的函数,可以用来封装可重用的模板片段。

Jinja2基础使用

安装Flask已默认安装,独立安装方法: pip install jinja2
引用变量

<h1>Hello, {{ name }}!</h1> 

支持使用过滤器

<h1>Hello, {{ name|capitalize }}!</h1>

if判断

{% if name %} 
<h1>Hello, {{ name }}!</h1>
 {% else %} 
<h1>Hello, Stranger!</h1> 
{% endif %}

for循环

{% for user in users %}
<tr class="{% if loop.odd %}odd{% else %}even{% endif %}"> 
<td>{{ loop.index }}</td> 
<td>{{ user.name }}</td>
 <td>{{ user.email }}</td>
 </tr>
{% endfor %}

Jinja2模板嵌套

Jinja2 提供了模板继承的功能,这使得你可以创建一个基础模板(通常称为 "父模板" 或 "基模板"),然后创建多个继承自基模板的子模板。这样可以避免在多个模板中重复相同的代码。

base.html

<!DOCTYPE html>
<html>
<head>
  <title>{% block title %}{% endblock %}</title>
</head>
<body>
  <header> <h1>My Website</h1> </header>
  <main> {% block content %}{% endblock %}</main>
  <footer> <p>Copyright &copy; 2022</p> </footer>
</body>
</html>

在这个模板中,{% block title %}{% block content %} 是两个块。子模板可以覆盖这些块来插入自己的内容。

index.html(继承base.html)

{% extends “base.html” %}

{% block title %} Home Page {% endblock %}

{% block content %}
  <h2>Welcome to my website!</h2>
  <p>This is the home page.</p>
{% endblock %}

Jinja2使用宏

在 Jinja2 中,宏类似于 Python 中的函数,可以用来封装可重用的模板片段。
你可以使用 {% macro %} {% endmacro %} 标签来定义一个宏,例如:

{% macro render_user(user) %}
  <p>Name: {{ user.name }}</p>
  <p>Email: {{ user.email }}</p>
{% endmacro %}

在这个例子中,render_user 是一个宏,它接受一个参数 user,并生成一个包含用户的名字和电子邮件的段落。你可以像调用函数一样调用宏,例如:

{% for user in users %}
  {{ render_user(user) }}
{% endfor %}

你也可以将宏保存在一个单独的文件中,然后在其他模板中导入它。例如,如果你将上面的宏保存在 macros.html 文件中,你可以这样导入它:

{% from “macros.html” import render_user %}

{% for user in users %}
  {{ render_user(user) }}
{% endfor %}

Flask数据库操作

SQLAlchemy介绍

SQLAlchemy 是一个用于 Python 的 SQL 工具包和对象关系映射(ORM)系统。它为高效和高性能的数据库访问提供了全面的企业级持久性模型

SQLAlchemy主要特点

  • 对象关系映射(ORM):SQLAlchemy 提供了一个全功能的 ORM,它允许开发者以面向对象的方式处理数据库中的数据。你可以定义数据模型(即类),SQLAlchemy 会自动将它们映射到数据库表。
  • 数据表达语言(DDL):SQLAlchemy 提供了一种 Pythonic 的方式来生成和执行 SQL 语句,包括创建和删除表,插入、更新和删除数据等。
  • SQL 表达语言:SQLAlchemy 提供了一种构造 SQL 查询的 DSL(领域特定语言)。这种 DSL 提供了丰富的查询构造选项,并且可以跨多种数据库后端使用。
  • 数据库抽象层:SQLAlchemy 提供了一种数据库抽象层,使得你可以使用相同的代码来操作不同的数据库系统(如 MySQL、PostgreSQL、SQLite 等)。
  • 事务和会话管理:SQLAlchemy 提供了强大的事务和会话管理功能,使得你可以方便地处理数据库事务。
  • 连接池:SQLAlchemy 内置了连接池功能,可以有效地管理数据库连接,提高应用性能。

Flask-SQLAlchemy使用

Flask-SQLAlchemy参考

Flask-SQLAlchemy 是一个为 Flask 应用提供 SQLAlchemy 支持的扩展,它简化了 SQLAlchemy 的使用,使得在 Flask 应用中进行数据库操作变得更加方便。

  1. 安装方法:
$ pip install flask-sqlalchemy
  1. 初始化插件
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////db.sqlite'
db = SQLAlchemy(app)
  1. 定义模型
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    ...
  1. 创建表
with app.app_context():
    db.create_all()

Flask-Migrate使用

Flask-Migrate 为 Flask 和SQLAlchemy 添加了数据库迁移的扩展名了

  1. 安装方法
$ pip install flask-migrate
  1. 初始化插件
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////db.sqlite'
db = SQLAlchemy(app)

migrate = Migrate(app, db)  # 添加此行
  1. 使用方法
$ flask db init           # 初始化migrations目录
$ flask db migrate    # 生成迁移脚本
$ flask db upgrade   # 执行迁移

模型声明

声明式模型

class Role(db.Model):
    __tablename__ = 'roles'  # 指定表名,默认为类名小写
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80), nullable=False, comment="角色名称”)
    description = db.Column(db.Text, comment="角色描述")

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80), nullable=False, comment="用户名称")
    email = db.Column(db.String(120), unique=True, comment="邮件”)
    age = db.Column(db.Integer, comment=“年龄”)
    role_id = db.Column(db.Integer, db.ForeignKey("roles.id", comment="角色id")  # 外键

    role = db.reletionship("Role", backref="users")  # 关系字段(非该表字段)

描述型模型

Role = Table(
    'roles',   # 表名
    db.Model.metadata,  # 元数据配置
    db.Column(‘id’, db.Integer, primary_key=True),
    db.Column(‘name’, db.String(80), nullable=False, comment="角色名称"),
    db.Column(‘description’, db.Text, nullable=False, comment="角色描述"),
)

常见字段类型

字段(db.Column)常见类型

  • db.Integer 整型
  • db.String (size) 字符串,size 为最大长度,比如 db.String(20)
  • db.Text 长文本
  • db.DateTime 时间日期,Python datetime 对象
  • db.Float 浮点数
  • db.Boolean 布尔值

字段(db.Column)常用参数

  • primary_key 是否主键
  • unique 是否唯一
  • index 是否索引列
  • nullable 是否允许为null,默认为允许 (nullable=True)
  • default 插入/修改时字段的默认值,如 default=1
  • server_default 在创建表结构时,声明字段默认值,必须为字符串类型,如 server_default=text(¡°1¡±)
  • onupdate 设置字段更新时的处理函数
  • comment 在创建表结构时,为字段添加字段说明

模型增删改查

增加

user = User(name='kevin', email='kevin@gmail.com')
db.session.add(user)
db.session.commit()

删除

user = User.query.get(user_id)
db.session.delete(user)
db.session.commit()

修改

user = User.query.get(user_id)
user.email = 'kevin@hotmail.com'
db.session.add(user)
db.session.commit()

查询
查询所有

users = User.query.all()

查询单个

# 通过主键id查询(查询不到返回None)
user = User.query.get(user_id)
# 通过属性过滤并取第一个
user = User.query.filter_by(name='kevin').first()
# 或 user = User.query.filter(User.name =='kevin').first()

查询多个

users = User.query.filter(User.email.like('%gmail.com%').all()

# 多条件筛选
users = User.query.filter(User.name.like('%kevin%').filter(User.email.like('%gmail.com%')).all()

# 或使用and_
users = User.query.filter(
    and_(User.name.like('%kevin%'),
            User.email.like('%gmail.com%')
    ).all()

# 多条件筛选-or
users = User.query.filter(
    or_(User.name.like('%kevin%'),
          User.email.like('%gmail.com%')
    ).all()

# 联表查询
users = User.query.join(Role, User.role_id==Role.id).filter(Role.name == 'Admin').all()

详细查询方法

模型query方法

  • User.query.get(id) 根据主键查询,返回指定主键值的记录,如果未找到,则返回 None
  • User.query.get_or_404(id) 根据主键查询,如果未找到,则返回 404 错误响应
  • User.query.first() 返回查询的第一条记录,如果未找到,则返回 None
  • User.query.first_or_404() 404 错误响应
  • User.query.all() 返回包含所有查询记录的列表
  • User.query.count() 返回查询结果的数量
  • User.query.filter() 使用指定的规则过滤记录,返回新产生的查询对象
  • User.query.filter_by() 以关键字表达式形式过滤纪录,返回新产生的查询对象
  • User.query.join() 联表查询,返回新产生的查询对象
  • User.query.order_by() 根据指定条件对记录进行排序,返回新产生的查询对象
  • User.query.paginate() 返回一个 Pagination 对象,可以对记录进行分页处理

使用db.session

增加/修改

  • db.session.add(obj)
    删除
  • db.session.delete(obj)
    提交
  • db.session.flash(): 预提交
  • db.session.commit(): 提交
    查询
  • result = db.session.query(User.age, func.count(User.id)).group_by(User.age).having(User.age<18).all()
    执行(查询或修改)
  • db.session.execute(select(User).where(User.age < 18)).scalars()
  • db.session.exectue(insert(User).values(username=“kevin”, email=“kevin@gmail.com”, age=16)

关系字段db.relationship

关系字段(db.relationship)常用参数

  • backref 在关系的另一模型中添加反向引用,用于找到父表 (单向关系声明)
  • back_populates 关联另一模型中指向该表的关系字段(双向关系声明)
  • primary join 明确指定两个模型之间使用的联结条件
  • uselist 如果为False,不使用列表,而使用标量值
  • order_by 指定关系中记录的排序方式
  • secondary 指定多对多中记录的排序方式
  • secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件
  • foreign_keys 指定关联的本表中外键字段,当有多个指向同一表的外键时需要手动指定,如  creator = db.releation_ship("User", foreign_keys="Book.creator_id")
  • lazy 关系字段数据加载方式,默认lazy='select' 自动加载

关系字段加载(lazy)选项

  • select (默认) 多表查询方式自动加载关系字段数据
  • joined 联表(join)查询方式自动加载关系字段数据
  • subquery 子查询方式自动加载关系字段数据
  • dynamic 动态加载,在使用 <关系字段>.all()/.filter()/.first()等方法时再进行加载(查询),dynamic仅支持一对多父表中关联子表的关系字段,或多对多关系字段
  • noload 不加载关系字段数据

一对多关系

外键声明
子表中添加父表的外键 如,

role_id = db.Column(db.Interger, db.ForeignKey("roles.id"),# 注意!!!,ForeignKey中是父表的"表名.id"

单向关系声明(可选,一对多关系建议使用该方式)
如果需要引用父表或子表的数据,可以在子表 父表中添加关系字段,并添加反向引用字段
子表添加关系字段:

class Role(db.Model):
    __tablename__ = 'roles'
    id = db.Column(db.Integer, primary_key=True)

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    role_id = db.Column(db.Integer, db.ForeignKey(“roles.id”))

    # 子表添加关系字段
    role = db.relationship("Role", backref="users")

或 父表添加关系字段:

class Role(db.Model):
    __tablename__ = 'roles'
    id = db.Column(db.Integer, primary_key=True)
    # 父表添加关系字段
    users = db.relationship("User", backref="role")

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    role_id = db.Column(db.Integer, db.ForeignKey("roles.id"))

双向关系声明(可选)
也可以在子表和父表中都添加关系字段,并使用back_populates关联两个字段

class Role(db.Model):
    # ...
    users = db.relationship("User", back_populates="role")

class User(db.Model):
    # ...
    role = db.relationship("Role", back_populates="users")

自关联
自关联关系声明
表中可以使用外键关联本身,形成自关联(嵌套),自关联表关系字段需要设置remote_side=[id]

class Category(db.Model):
    __tablename__ = 'categories'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128), nullable=False)
    description = db.Column(db.String(500))
    parent_id = db.Column(db.Integer, db.ForeignKey(“categories.id”))    # 外键指向当前表
    parent = db.relationship(“Category”, remote_side=[id], backref=“children”)   # 不支持lazy=dynamic
    # 或 children = db.relationship("Category", remote_side=[id], backref="parent")

自关联关系创建

# 创建上级分类
c1 = Category(name='分类1')
db.session.add(c1)
db.session.commit()

# 创建子分类
c1_1 = Category(name='分类1-1', parent_id=c1.id)
db.session.add(c1)
db.session.commit()

自关关系查询

# 通过父对象查询子对象
c1 = Category.query.get(1)
for sub_category in c1.children():
    print(sub_category.name)

# 通过子对象查询父对象
c1_1 = Category.query.get(2)
print(c1_1.parent.name)

多个外键
多个外键指向同一个表
当子表中有多个外键指向同一个父表时,关系字段需要显示声明foreign_keys,关联本表哪个外键字段

class User(db.Model):
    # ...

class TestCase(db.Model):
    __tablename__ = 'testcases'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128), nullable=False)
    description = db.Column(db.String(500))
    create_time = db.Column(db.DateTime, server_default=func.now(), comment='创建时间')
    update_time = db.Column(db.DateTime, server_default=func.now(), onupdate=func.now(), comment='修改时间')
    creator_id = db.Column(db.Integer, db.ForeignKey('user.id'))   # 外键字段
    operator_id = db.Column(db.Integer, db.ForeignKey('user.id'))   # 外键字段

    creator = db.relationship("User", backref="created_testcases", foreign_keys="TestCase.creator_id")
    operator = db.relationship("User", backref="updated_testcases", foreign_keys="TestCase.operator_id")

一对多关系操作
关系查询(lazy=select(默认)/joined/subquery)
子表获取父表信息,如,

user = User.query.get(user_id)
print(user.role.name)

父表获取子表信息,如

role = Role.query.get(role_id)
for user in role.users:
    print(user.name)

关系查询(lazy=dynamic)
子表获取父表信息,如,

user = User.query.get(user_id)
role = user.role.first()
print(role.name)

父表获取子表信息,如

role = Role.query.get(role_id)
for user in role.users.all():
    print(user.name)

关系查询(lazy=noload)
子表获取父表信息,如

user = User.query.get(user_id)
role = Role.query.filter(Role.id == user.role_id)
print(role.name)

父表获取子表信息,如

role = Role.query.get(role_id)
users = User.query.filter(User.role_id==role.id).all()
for user in users:
    print(user.name)

注意,在filter筛选时不能直接使用关系字段属性进行筛选
User.query.filter_by(user.role.name='Admin')  ❌
User.query.filter(User.role.name=='Admin')   ❌
正确的使用方式为
User.join(Role).filter(Role.name=='Admin')  或
db.session.query(User,Role).filter(Role.name=='Admin')

一对一关系

外键声明
子表中添加父表的外键 如,

role_id = db.Column(db.Interger, db.ForeignKey("roles.id"),# 注意!!!,ForeignKey中是父表的"表名.id"

双项关系声明(可选,一对一建议使用该方式,userlist=False不支持 lazy=dynamic)
如果需要引用父表或子表的数据,可以在子表和父表中添加关系字段,并设置userlist=False

class User(db.Model):
    # ...
    profile = db.relationship("Profile", back_populates="user", uselist=False)  # 添加一对一关系

class Profile(db.Model):     # 附表
    __tablename__ = 'profiles'
    id = db.Column(db.Integer, primary_key=True)
    phone = db.Column(db.String(11))
    address = db.Column(db.String(500))
    user_id = db.Column(db.Integer, db.ForeignKey("user.id"))  # 添加主表外键

    user = db.relationship('User', back_populates='profile', uselist=False)  # 添加一对一关系

关系查询(不支持lasy=dynamic)
主表获取附表信息,如,

user = User.query.get(user_id)
print(user.profile.phone)

附表获取主表信息,如

profile = Profile.query.get(profile_id)
print(profile.user.name)

模型关系-多对多关系

使用标准中间表
多对多实际上是通过一个包含两者外键的中间表实现的,通过添加关系字段,可以略过中间表,直接获取关联的数据
可以通过Table创建一个(标准的)中间表,示例如下:

class User(db.Model):
    __tablename__ = 'users'
    # ..

ProjectUser = Table(   # 中间表
    'projects_users',  # 表名
    db.Model.metadata,  # 元数据配置
    db.Column(‘project_id’, db.Integer, db.ForeignKey('projects.id'), primary_key=True),
    db.Column(‘user_id’, db.Integer, db.ForeignKey('users.id'), primary_key=True),
)

class Project(db.Model):
    __tablename__ = 'projects'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128), unique=True, nullable=False)
    description = db.Column(db.String(500))

    # 关系字段
    users = db.relationship("User", secondary=ProjectUser, backref="projects", lazy="dynamic")

创建多对多关系

project = Project(name='项目1')

for user_id in user_ids:  # 如 [1, 2, 3]
    user = User.query.get(user_id)
    project.user.append(user)

db.session.add(project)
db.session.commit()

查询多对多关系

project = Project.query.get(project_id)

for user in project.users.all()  # lazy=dynamic方式
    print(user.name)

使用带额外字段的独立中间表
如果需要在中间表中添加额外字段,可以使用声明的方式建立中间表(当然用Table描述的方式也可以)

class User(db.Model):
    __tablename__ = 'users'
    # ..

class ProjectUser(db.Model):  # 中间表
    __tablename__ = 'projects_users'
    project_id = db.Column(db.Integer, db.ForeignKey('projects.id'), primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True)
    role = db.Column(db.String(30))

    project = db.relationship('Project', backref='related_users')  # 关系字段,也可以在主表中声明 (以使用lazy=dynamic) 
    user = db.relationship('User', backref='related_projects')     # 关系字段,也可以在主表中声明(以使用lazy=dynamic) 

class Project(db.Model):
    __tablename__ = 'projects'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128), unique=True, nullable=False)
    description = db.Column(db.String(500))

创建多对多关系

project = Project(name='项目1')
db.session.add(project)
db.session.commit()     # 先要创建出项目,这样才有project.id

for user_id in user_ids:  # 创建每一个中间表对象,并关联两个表
    project_user = ProjectUser(project_id=project.id,user_id=user_id, role='QA')
    db.session.add(project_user)
    db.session.commit()

查询多对多关系

project = Project.query.get(project_id)

for m in project.related_users:
    print(m.user.name)  # 通过中间表对象m中的关系字段user

Flask项目结构搭建

蓝图(Blueprint)介绍

在 Flask 中,“蓝图”(Blueprint),即路由嵌套,是一种组织路由和视图函数的方式,它允许你将应用分割成一组相关的部分。每个蓝图都可以有自己的路由、视图函数、模板文件、静态文件等。你可以在一个蓝图中定义一组相关的路由,然后在另一个蓝图中定义另一组相关的路由。这样可以使你的应用更加模块化,更容易维护。

from flask import Flask, Blueprint
app = Flask(__name__)

api_bp = Blueprint('api', __name__, url_prefix='/api')        # 接口根路由(蓝图)
user_bp = Blueprint('user', __name__, url_prefix='/user')   # 用户接口路由(蓝图)
api_bp.register_blueprint(user_bp)   # 注册用户接口路由(蓝图)到api
app.register_blueprint(api_bp)         # 注册主接口路由(蓝图)到app

@user_bp.route('/list')                     # 最终接口地址为 /api/user/list
def user_list(id):
    users = User.query.all()
    return [{'id': user.id, 'name': user.name} for user in users]

项目结构搭建

Flask项目并没有标准的项目结构,每个人根据自己的规划搭建出来的项目结构也是不同的。总体来说合理的组织,确保可启动、可配置、可扩展、模型可被发现即可。
一般来说可以分为按功能聚合(单应用) 和 按模型(或应用)聚合 两种

按功能聚合(单应用)结构参考

project /
  |-- apis /
  |    |-- __init__.py   # 主路由及自路由注册
  |    |-- base.py       # 接口基础配置
  |    |-- user.py       # 用户相关接口
  |    |-- testcase.py
  |-- models /
  |    |-- __init__.py   # 暴露内部的模型
  |    |-- base.py      # 模型基础类
  |    |-- user.py       # 用户相关模型
  |    |-- testcase.py
  |-- config.py  # 数据库、插件等配置
  |-- app.py      # 应用创建及启动

按模型(或应用)聚合结构参考

project /
  |-- user /
  |    |-- __init__.py   # 主路由及自路由注册
  |    |-- apis.py       # 用户相关接口
  |    |-- models.py
  |-- testcase /
  |    |-- __init__.py   # 暴露内部的模型
  |    |-- apis.py      # 模型基础类
  |    |-- models.py       # 用户相关模型
  |-- config.py  # 数据库、插件等配置
  |-- app.py      # 应用创建及启动

注:对于非前后端分离式项目,项目结构中会有templates/ static/等目录,接口模块一般使用views.py 或 views/目录,而在前后端分离式项目中,我们一般使用apis.py或apis/目录

简单项目结构参考
以按功能聚合为例,一种简单的结构如下:

项目配置 config.py 内容 ```python from pathlib import Path from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from sqlalchemy import MetaData

ROOT_PATH = Path(file).parent # 项目根目录

插件配置

db = SQLAlchemy()
migrate = Migrate(db=db)

EXTENSIONS = [db, migrate]

APP配置

SECRET_KEY = 'secret'
APP_CONFIG = {
'SQLALCHEMY_DATABASE_URI': f"sqlite:///{ROOT_PATH}/db.sqlite"
}

</details>

<details>
<summary>项目主程序 app.py 内容参考</summary>

```python
from flask import Flask
from apis import api_bp  # 从 apis/__init__.py中导入api_bp
from config import APP_CONFIG, EXTENSIONS, SECRET_KEY

def make_app():
    # 创建app
    app = Flask(__name__)
    app.secret_key = SECRET_KEY # 配置app
    app.config.update(APP_CONFIG)

    for ext in EXTENSIONS:  # 注册插件
        ext.init_app(app)

    app.register_blueprint(api_bp)   # 注册接口根路由

    @app.errorhandler(Exception) # 注册异常处理函数
    def error_500(error):
        app.logger.exception(error)
        return {'code': 500, 'msg': str(error), ‘data’: None}, 500

    return app
模型基础类 models/base.py 内容参考 ```python from sqlalchemy import desc, inspect from settings import db

class BaseModel(db.Model):
abstract = True  # 抽象模型
fields = None    # 自定义字段, 序列化字段配置
order_by = 'id' # 自定义字段,排序字段
id = db.Column(db.Integer, primary_key=True)  # 默认id主键字段

@classmethod
def get(cls, id=None, kwargs):  # 查询单个对象
if id is not None:
return cls.query.get(id)
return cls.query.filter_by(
kwargs).first()

@classmethod
def list(cls, order_by: str = None, kwargs):  # 查询列表
order_by = order_by or cls.order_by
if order_by.startswith(‘-’):  # 如果配置为 '-id' 则按降序排序
order_by = desc(order_by[1:])
fields = cls.get_fields()
kwargs = {key: value for key, value in kwargs.items()
if key in fields and value is not None}
qs = cls.query.filter_by(
kwargs)
if order_by is not None:
return qs.order_by(order_by)
return qs.all()

@classmethod
def create(cls, kwargs):  # 创建对象
obj = cls(
kwargs)
return obj.save()

def save(self, commit=True):  # 保存对象(创建或更新时使用)
db.session.add(self)
db.session.flush()
if commit:
db.session.commit()
return self

def update(self, commit=True, **kwargs):  # 更新对象
for attr, value in kwargs.items():
setattr(self, attr, value)
if commit:
return self.save()
return self

def delete(self, commit: bool = True) -> None:  # 删除对象
db.session.delete(self)
db.session.flush()
if commit:
db.session.commit()

@classmethod
def get_fields(cls):  # 获取字段配置或模型所有字段名
return cls.fields or [column.key for column in inspect(cls).attrs]

@property
def data(self):        # 根据字段名,返回对象的属性字典
fields = self.get_fields()
return {key: getattr(self, key) for key in fields}

</details>

<details>
<summary>具体模型类 models/user.py 内容参考</summary>
```python
from models.base import BaseModel, db

class Role(BaseModel):
    """角色"""
    __fields__ = ['id', 'name', 'description']
    name = db.Column(db.String(64), unique=True)
    description = db.Column(db.String(256), nullable=True)
    users = db.relationship('User', backref='role', lazy='dynamic')

class User(BaseModel):
    """用户"""
    __fields__ = ['id', 'name', 'username']
    username = db.Column(db.String(50), unique=True)
    name = db.Column(db.String(50))
    password_hash = db.Column(db.String(128))
    role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
模型包(Package)配置 models/__init__.py 参考 ```python from .user import User ```
接口基础配置 apis/base.py 参考内容 ```python from typing import Union

SUCCESS = (0, 'success')

class Errors:  # 错误码
BAD_REQUEST = (400, '请求参数错误')
USER_NOT_FOUND = (1001, '用户不存在')
USERNAME_EXIST = (1002, '用户名已存在')
# ...

def success_response(data: Union[dict, list] = None, msg=None, status_code=200):
code, msg = SUCCESS[0], msg or SUCCESS[1]
return {'code': code, 'msg': msg, 'data': data}, status_code

def error_response(error, data: Union[dict, list] = None, msg=None, append_msg=None, status_code=200):
code, msg = error[0], msg or error[1]
if append_msg is not None:
msg = '%s %s' % (msg, append_msg)
return {'code': code, 'msg': msg, 'data': data}, status_code

</details>

<details>
<summary>具体模型类 apis/user.py 内容参考</summary>

```python
from flask import Blueprint, request
from models.user import User
from .base import Errors, error_response, success_response

user_bp = Blueprint('user', __name__, url_prefix='/user')

@user_bp.post('/add')
def user_add():
    username = request.form.get('username')
    name = request.form.get('name') or ''
    password = request.form.get('password')  # todo 加密密码

    if not username:
        return error_response(Errors.BAD_REQUEST, msg='用户名不能为空')
    exist = User.get(username=username)

    if exist is not None:
        return error_response(Errors.USERNAME_EXIST)
    user = User.create(username=username, name=name, password_hash=password)
    return success_response(data=user.data, status_code=201)

@user_bp.get('/get/<int:id>')
def user_get(id):
    user = User.get(id)
    if user is None:
        return error_response(Errors.USER_NOT_FOUND)
    return success_response(data=user.data)

@user_bp.get('/list')
def user_list():
    users = User.list(**request.args)
    return success_response(data=[user.data for user in users])

@user_bp.post('/update/<int:id>')
def user_update(id):
    user = User.get(id)
    if user is None:
        return error_response(Errors.USER_NOT_FOUND)
    name = request.form.get('name') or ''
    password = request.form.get('password')  # todo 加密密码
    user.update(name=name, password_hash=password)
    return success_response(data=user.data)

@user_bp.post('/delete/<int:id>')
def user_delete(id):
    user = User.get(id)
    if user is None:
        return error_response(Errors.USER_NOT_FOUND)
    user.delete()
接口包(Package)路由注册 apis/__init__.py 内容参考
from flask import Blueprint
from .user import user_bp
# ...

api = Blueprint('api', __name__, url_prefix='/api')
api.register_blueprint(user_bp)
# ...