【14.0】中间件、跨域资源共享、后台任务、测试用例

发布时间 2023-10-01 15:35:31作者: Chimengmeng

【一】中间件

【1】中间件介绍

image-20231001140700724

  • FastAPI 中间件是在处理请求和响应的过程中介入的组件,允许你在请求到达处理函数之前或响应离开处理函数之后执行一些逻辑。
  • 中间件在 FastAPI 中起到非常灵活的作用,可以用于日志记录、身份验证、异常处理等。

【2】中间件的工作原理

(1)注册中间件

  • 在 FastAPI 中,你可以通过 app.add_middleware 方法注册中间件。
  • 这通常在应用创建时进行。
from fastapi import FastAPI

app = FastAPI()

async def my_middleware(request, call_next):
    # 逻辑处理前
    response = await call_next(request)
    # 逻辑处理后
    return response

app.add_middleware(my_middleware)

(2)请求到达处理函数前的处理

  • 当有请求到达 FastAPI 应用时,它会首先经过注册的中间件。这些中间件可以在请求到达处理函数之前执行某些逻辑,如记录请求信息、进行身份验证等。

(3)处理函数逻辑执行

  • 请求会继续到达处理函数,执行处理函数中的逻辑。

(4)处理函数响应离开前的处理

  • 处理函数的响应在离开之前又会经过注册的中间件。这允许中间件执行一些处理函数响应离开前的逻辑,如记录响应信息、修改响应头等。

(5)中间件链

  • 多个中间件可以形成一个中间件链。每个中间件都有机会在请求到达处理函数前和处理函数响应离开前执行一些逻辑。中间件按照注册的顺序执行。

【3】使用步骤

from fastapi import FastAPI, HTTPException

app = FastAPI()

# 中间件
async def my_middleware(request, call_next):
    print("Processing request")
    response = await call_next(request)
    print("Processing response")
    return response

app.add_middleware(my_middleware)

# 路由
@app.get("/")
def read_root():
    return {"message": "Hello, World!"}
  • 在上面的例子中,my_middleware 中间件在处理请求前和处理响应前打印了一些信息。
  • 这只是中间件的一种简单用法,你可以根据实际需求编写更复杂的中间件逻辑。

【4】示例

(1)定义中间件

  • projects\run.py
import time

from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
import uvicorn
from turtorial import app03, app04, app05, app06, app07, app08
from coronavirus import application

# from fastapi.exceptions import RequestValidationError
# from fastapi.responses import PlainTextResponse
# from starlette.exceptions import HTTPException as StarletteHTTPException

app = FastAPI(
    title='FastAPI Tutorial and Coronavirus Tracker API Docs',
    description='FastAPI教程 新冠病毒疫情跟踪器API接口文档,项目代码:https://github.com/liaogx/fastapi-tutorial',
    version='1.0.0',
    docs_url='/docs',
    redoc_url='/redocs',
)

# mount表示将某个目录下一个完全独立的应用挂载过来,这个不会在API交互文档中显示
# .mount()不要在分路由APIRouter().mount()调用,模板会报错
# path 访问路由
# app 挂载文件对象 StaticFiles from fastapi.staticfiles import StaticFiles
# directory 指定具体的文件目录
# name 别名
app.mount(path='/static', app=StaticFiles(directory='./coronavirus/static'), name='static')


# # 重写HTTPException异常处理器
# @app.exception_handler(StarletteHTTPException)
# async def http_exception_handler(request, exc):
#     """
#     :param request: 这个参数不能省
#     :param exc:
#     :return:
#     """
#     return PlainTextResponse(str(exc.detail), status_code=exc.status_code)
#
#
# @app.exception_handler(RequestValidationError)  # 重写请求验证异常处理器
# async def validation_exception_handler(request, exc):
#     """
#     :param request: 这个参数不能省
#     :param exc:
#     :return:
#     """
#     return PlainTextResponse(str(exc), status_code=400)


# 将其他app添加到主路由下
# app03 : app名字
# prefix :自定义路由地址
# tags :自定义路由标题 (默认是default)


# 定义中间件 : 拦截请求并计算处理请求的时间
@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
    '''

    :param request: 拦截到请求对象
    :param call_next:  # call_next将接收request请求做为参数
    :return:
    '''
    # 请求开始进入计时
    start_time = time.time()
    # 处理请求
    response = await call_next(request)
    # 请求结束计时
    process_time = time.time() - start_time
    # 将程序耗时 放到 响应头中
    # 添加自定义的以“X-”开头的请求头
    response.headers['X-Process-Time'] = str(process_time)
    
    return response


app.include_router(app03, prefix='/chapter03', tags=['第三章 请求参数和验证'])
app.include_router(app04, prefix='/chapter04', tags=['第四章 响应处理和FastAPI配置'])
app.include_router(app05, prefix='/chapter05', tags=['第五章 FastAPI的依赖注入系统'])
app.include_router(app06, prefix='/chapter06', tags=['第六章 安全、认证和授权'])
app.include_router(app07, prefix='/chapter07', tags=['第七章 FastAPI的数据库操作和多应用的目录结构设计'])
app.include_router(app08, prefix='/chapter08', tags=['第八章 中间件、CORS、后台任务、测试用例'])
app.include_router(application, prefix='/coronavirus', tags=['新冠病毒疫情跟踪器API'])


def main():
    # run:app : 启动文件:app名字
    # host :IP
    # port : 端口
    # reload : 自动重启
    # debug :debug 模式
    # worker : 开启的进程数
    uvicorn.run('run:app', host='127.0.0.1', port=8999, reload=True, debug=True, workers=1)


if __name__ == '__main__':
    main()

(2)测试

  • 由于是中间件,可以在任意视图函数生效,因此任意接口都会在响应头中携带时间

image-20231001142202704

【补充】带yield的依赖的退出部分的代码 和 后台任务 会在中间件之后运行

  • 在 FastAPI 中,带 yield 的依赖项和后台任务(Background Tasks)都与中间件密切相关,但它们的执行时机和作用略有不同。

(0)解释

  • 我们前面在数据库操作中定义了一个数据库连接和关闭的操作
    • 带 yield 的依赖
# 创建子依赖对象
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
  • 上面的意思是
    • 请求过来时,中间件处理请求
    • 中间件处理完毕,才会执行 finally 的代码

(1)yield 的依赖项(Depends)

  • yield 的依赖项用于在路径操作函数执行之前执行一些逻辑,并允许你在逻辑的开始和结束时执行额外的操作。

  • 在依赖项的执行过程中,当到达 yield 语句时,会暂停执行路径操作函数,并返回其值。

  • 然后在路径操作函数执行完毕后,继续执行 yield 之后的逻辑。

  • 示例:

from fastapi import Depends, FastAPI

app = FastAPI()

def get_query(background_tasks: BackgroundTasks, q: str = Depends(query_depend)):
    # Some async logic here
    result = some_function(q)
    
    # Enqueue a background task
    background_tasks.add_task(some_background_task, result)
    
    # Yield the result
    yield result
  • 在上述例子中,get_query 函数中的带 yield 的依赖项 Depends(query_depend) 中的逻辑会在请求到达路径操作函数前执行。
  • 当执行到 yield result 时,会先暂停执行 get_query 函数,执行路径操作函数,然后再在路径操作函数执行完毕后继续执行 get_query 函数中的 yield result 之后的逻辑。

(2)后台任务(Background Tasks)

  • 后台任务允许你在路径操作函数执行完毕后异步执行一些逻辑。
    • 它通常用于执行一些与请求响应过程无关的异步任务,如发送邮件、保存日志等。
  • 示例:
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def some_background_task(result):
    print(f"Doing some background work with {result}")

@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks
):
    # Some async logic here
    result = some_function(email)
    
    # Enqueue a background task
    background_tasks.add_task(some_background_task, result)
    
    return {"message": "Message sent"}
  • 在上述例子中,some_background_task 函数会在请求响应过程结束后异步执行,而不会阻塞响应的返回。
  • 这样可以保证响应迅速返回,而后台任务则可以在后台执行。

(3)中间件执行顺序

  • 中间件的执行顺序是在请求到达处理函数前,和在处理函数响应离开前。
  • 而带 yield 的依赖项的退出部分和后台任务则分别在处理函数执行完毕后和响应离开前执行。
  • 因此,中间件的执行顺序可以看作是在这两个时间点之间。

【二】跨域资源共享(CORS)

【1】跨域资源共享(CORS)介绍

image-20231001143048944

  • 跨域资源共享(CORS)是一种浏览器机制,它使用额外的 HTTP 头来告诉浏览器让一个 web 应用运行在一个源(domain)上的 web 页面能够请求来自于不同源服务器上的资源。
  • 在 FastAPI 中,你可以通过使用 fastapi.middleware.cors 模块提供的 CORSMiddleware 中间件来实现跨域资源共享。

【2】使用步骤

(1)安装 CORS 中间件

  • 在你的 FastAPI 项目中,首先需要确保你已经安装了 FastAPI 和 Uvicorn。

  • 然后,你需要安装 python-multipart 包,这是 FastAPI 内部依赖的一个库。

    pip install python-multipart
    

(2)导入和使用中间件

  • 在你的 FastAPI 应用中,导入 CORSMiddleware 中间件,并添加到应用的中间件链中。

  • 你可以指定允许的来源、允许的方法、允许的头等。

    from fastapi import FastAPI
    from fastapi.middleware.cors import CORSMiddleware
    
    app = FastAPI()
    
    # 允许所有来源访问,允许所有方法,允许所有头
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    
  • 在上面的例子中,allow_origins=["*"] 允许所有来源跨域访问。

  • 在生产环境中,你应该明确指定允许的来源,而不是使用 *

  • allow_methodsallow_headers 参数指定允许的 HTTP 方法和头。

(3)中间件参数说明

  • allow_origins: 允许访问的来源,可以是字符串或列表。
  • allow_credentials: 是否允许携带身份验证信息,如 cookies。
  • allow_methods: 允许的 HTTP 方法,可以是字符串或列表。
  • allow_headers: 允许的 HTTP 头,可以是字符串或列表。
  • expose_headers: 暴露给浏览器的头,可以是字符串或列表。
  • max_age: 指定预检请求的缓存时间(以秒为单位)。

(4)配置更多选项

  • CORS 中间件提供了其他配置选项,如 max_ageexpose_headers 等。你可以根据需求进行配置。

【3】注意事项

  • 在生产环境中,避免使用 allow_origins=["*"],而是明确指定允许的来源。
  • 跨域请求时,浏览器会发送一个 OPTIONS 请求(预检请求)以检查服务端是否支持跨域。CORS 中间件会处理这个 OPTIONS 请求。
  • 使用 CORS 中间件,你的 FastAPI 应用就可以支持跨域请求了。
  • 通过配置 CORS 中间件,FastAPI 应用就可以与不同域的前端应用进行跨域通信。
  • 这对于构建现代 Web 应用和提供 RESTful API 是非常重要的。

【4】示例

import time
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
import uvicorn
from turtorial import app03, app04, app05, app06, app07, app08
from coronavirus import application

# from fastapi.exceptions import RequestValidationError
# from fastapi.responses import PlainTextResponse
# from starlette.exceptions import HTTPException as StarletteHTTPException

app = FastAPI(
    title='FastAPI Tutorial and Coronavirus Tracker API Docs',
    description='FastAPI教程 新冠病毒疫情跟踪器API接口文档,项目代码:https://github.com/liaogx/fastapi-tutorial',
    version='1.0.0',
    docs_url='/docs',
    redoc_url='/redocs',
)

# mount表示将某个目录下一个完全独立的应用挂载过来,这个不会在API交互文档中显示
# .mount()不要在分路由APIRouter().mount()调用,模板会报错
# path 访问路由
# app 挂载文件对象 StaticFiles from fastapi.staticfiles import StaticFiles
# directory 指定具体的文件目录
# name 别名
app.mount(path='/static', app=StaticFiles(directory='./coronavirus/static'), name='static')


# # 重写HTTPException异常处理器
# @app.exception_handler(StarletteHTTPException)
# async def http_exception_handler(request, exc):
#     """
#     :param request: 这个参数不能省
#     :param exc:
#     :return:
#     """
#     return PlainTextResponse(str(exc.detail), status_code=exc.status_code)
#
#
# @app.exception_handler(RequestValidationError)  # 重写请求验证异常处理器
# async def validation_exception_handler(request, exc):
#     """
#     :param request: 这个参数不能省
#     :param exc:
#     :return:
#     """
#     return PlainTextResponse(str(exc), status_code=400)


# 将其他app添加到主路由下
# app03 : app名字
# prefix :自定义路由地址
# tags :自定义路由标题 (默认是default)

@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
    '''

    :param request: 拦截到请求对象
    :param call_next:  # call_next将接收request请求做为参数
    :return:
    '''
    # 请求开始进入计时
    start_time = time.time()
    # 处理请求
    response = await call_next(request)
    # 请求结束计时
    process_time = time.time() - start_time
    # 将程序耗时 放到 响应头中
    # 添加自定义的以“X-”开头的请求头
    response.headers['X-Process-Time'] = str(process_time)

    return response

app.add_middleware(
    # 使用 fastapi 自带的跨域包
    CORSMiddleware,
    # 允许跨域的域名
    allow_origins=[
        "http://127.0.0.1",
        "http://127.0.0.1:8080"
    ],
    # 允许使用证书
    allow_credentials=True,
    # 允许跨域的请求方法
    allow_methods=["*"],
    # 允许跨域的请求头
    allow_headers=["*"],
)


app.include_router(app03, prefix='/chapter03', tags=['第三章 请求参数和验证'])
app.include_router(app04, prefix='/chapter04', tags=['第四章 响应处理和FastAPI配置'])
app.include_router(app05, prefix='/chapter05', tags=['第五章 FastAPI的依赖注入系统'])
app.include_router(app06, prefix='/chapter06', tags=['第六章 安全、认证和授权'])
app.include_router(app07, prefix='/chapter07', tags=['第七章 FastAPI的数据库操作和多应用的目录结构设计'])
app.include_router(app08, prefix='/chapter08', tags=['第八章 中间件、CORS、后台任务、测试用例'])
app.include_router(application, prefix='/coronavirus', tags=['新冠病毒疫情跟踪器API'])


def main():
    # run:app : 启动文件:app名字
    # host :IP
    # port : 端口
    # reload : 自动重启
    # debug :debug 模式
    # worker : 开启的进程数
    uvicorn.run('run:app', host='127.0.0.1', port=8999, reload=True, debug=True, workers=1)


if __name__ == '__main__':
    main()

【三】后台任务

【1】介绍

  • FastAPI 中的后台任务(Background Tasks)允许你在请求处理函数返回响应后异步执行一些任务,而不会等待这些任务的完成。
  • 这对于执行一些不需要在请求响应中等待的异步操作非常有用,例如发送邮件、执行定时任务等。

【2】使用步骤

(1)引入后台任务

  • 在 FastAPI 中,你可以使用 BackgroundTasks 类来引入后台任务。

  • 你只需将 BackgroundTasks 对象作为处理函数的参数即可。

    from fastapi import BackgroundTasks, FastAPI
    
    app = FastAPI()
    
    def send_email(email: str, message: str, background_tasks: BackgroundTasks):
        # 后台任务中的逻辑,此处简化为打印信息
        background_tasks.add_task(print, f"Sending email to {email}, message: {message}")
    
    @app.post("/send-email/{email}")
    async def send_email_route(email: str, background_tasks: BackgroundTasks):
        background_tasks.add_task(send_email, email, message="Hello!")
        return {"message": "Email will be sent in the background"}
    

(2)添加后台任务

  • 在处理函数中,你可以通过调用 background_tasks.add_task 方法来添加后台任务。

  • 该方法接受一个函数和一些参数,表示在后台执行的函数和其参数。

    • 在上面的例子中,send_email_route 处理函数中添加了一个后台任务,即 send_email 函数,用于发送邮件。

(3)异步执行后台任务

  • 当处理函数返回响应后,FastAPI 将异步执行添加的后台任务。

  • 这意味着处理函数不会等待后台任务的完成,而是立即返回响应。

    • 在上面的例子中,send_email 函数中的后台任务打印信息将在 send_email_route 函数返回响应后异步执行。

(4)多个后台任务

  • 你可以在处理函数中添加多个后台任务。

  • 这些任务将按照添加的顺序异步执行。

    @app.post("/multiple-tasks/{email}")
    async def multiple_tasks(email: str, background_tasks: BackgroundTasks):
        background_tasks.add_task(send_email, email, message="Hello!")
        background_tasks.add_task(print, "Another background task")
        return {"message": "Multiple background tasks added"}
    

(5)限制

  • 后台任务仅适用于异步函数,因此你不能在后台任务中执行同步函数。
  • 后台任务也不适用于 WebSocket 路由。
  • 使用后台任务的主要优势在于它允许你在请求处理函数返回响应后执行异步操作,而不会阻塞请求-响应周期。
  • 这对于处理那些可以在后台异步完成的操作非常有用。

【3】示例

(1)后台任务视图函数

from fastapi import APIRouter, BackgroundTasks

app08 = APIRouter()


def bg_task(framework: str):
    with open("README.md", mode="a") as f:
        f.write(f"## {framework} 框架精讲")


@app08.post("/background_tasks")
async def run_bg_task(framework: str, background_tasks: BackgroundTasks):
    """
    :param framework: 被调用的后台任务函数的参数
    :param background_tasks: FastAPI.BackgroundTasks
    :return:
    """
    # 模拟执行耗时任务
    background_tasks.add_task(bg_task, framework)
    return {"message": "任务已在后台运行"}
  • 发起请求

image-20231001145620158

(2)后台任务依赖任务

from fastapi import APIRouter, BackgroundTasks, Depends
from typing import Optional

app08 = APIRouter()


def bg_task(framework: str):
    with open("README.md", mode="a") as f:
        f.write(f"## {framework} 框架精讲")


@app08.post("/background_tasks")
async def run_bg_task(framework: str, background_tasks: BackgroundTasks):
    """
    :param framework: 被调用的后台任务函数的参数
    :param background_tasks: FastAPI.BackgroundTasks
    :return:
    """
    # 模拟执行耗时任务
    background_tasks.add_task(bg_task, framework)
    return {"message": "任务已在后台运行"}


def continue_write_readme(background_tasks: BackgroundTasks, q: Optional[str] = None):
    if q:
        background_tasks.add_task(bg_task,
                                  "\n> 整体的介绍 FastAPI,快速上手开发,结合 API 交互文档逐个讲解核心模块的使用\n")
    return q


@app08.post("/dependency/background_tasks")
async def dependency_run_bg_task(q: str = Depends(continue_write_readme)):
    if q:
        return {"message": "README.md更新成功"}
  • 发起请求

image-20231001145858421

【4】项目中的应用

from typing import List

import requests
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request
from fastapi.templating import Jinja2Templates
from pydantic import HttpUrl
from sqlalchemy.orm import Session
from starlette import status
from coronavirus import curd, schemas
from coronavirus.database import engine, Base, SessionLocal
from coronavirus.models import City, Data

# 创建子路由
application = APIRouter()

# 创建前端页面配置
templates = Jinja2Templates(directory='./coronavirus/templates')

# 初始化数据库引擎对象
Base.metadata.create_all(bind=engine)


# 创建子依赖对象
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# 创建城市
@application.post('/create_city', response_model=schemas.ReadCity)
async def create_city(city: schemas.CreateCity, db: Session = Depends(get_db)):
    '''

    :param city: 前端传入的符合 CreateCity 格式的城市数据
    :param db: 数据库操作对象,基于子依赖的数据库操作
    :return:
    '''
    # 判断是否存在当前城市 --- 根据前端传入的城市名字进行过滤
    db_city = curd.get_city_by_name(db=db, name=city.province)
    # 存在则主动抛出异常
    if db_city:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail='City is already registered!'
        )
    # 不存在则创建
    return curd.create_city(db=db, city=city)


# 查询城市数据
@application.get('/get_city/{city}', response_model=schemas.ReadCity)
async def get_city(city: str, db: Session = Depends(get_db)):
    '''

    :param city: 路径参数,路径中的城市名
    :param db: 数据库对象,依赖子依赖
    :return:
    '''
    # 使用数据库对象查询数据
    db_city = curd.get_city_by_name(db=db, name=city)
    # 校验数据是否存在
    if db_city is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail='City not found!'
        )
    # 数据存在
    return db_city


# 查询多个城市的数据
@application.get('/get_cities', response_model=List[schemas.ReadCity])
async def get_cities(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    '''

    :param skip: 起始位置
    :param limit: 结束位置
    :param db: 数据库对象,依赖子依赖
    :return:
    '''
    cities = curd.get_cities(db=db, skip=skip, limit=limit)

    return cities


# 创建数据
@application.post('/create_data', response_model=schemas.ReadData)
async def create_data_for_city(city: str, data: schemas.CreateData, db: Session = Depends(get_db)):
    '''

    :param city: 给那个城市创建数据
    :param data: 城市的详细数据
    :param db: 数据库对象,依赖子依赖
    :return:
    '''
    # 查询当前城市是否存在
    db_city = curd.get_city_by_name(db=db, name=city)
    # 创建数据
    data = curd.create_city_data(db=db, data=data, city_id=db_city.id)
    return data


# 获取数据
@application.get('/get_data')
async def get_data(city: str = None, skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    '''

    :param city: 城市名字
    :param skip: 起始位置
    :param limit: 截止位置
    :param db: 数据库对象,依赖子依赖
    :return:
    '''
    data = curd.get_data(city=city, skip=skip, limit=limit, db=db)
    return data


# 测试定义根路径首页页面路由
# @application.get('/')
# async def coronavirus(request: Request, city: str = None, skip: int = 0, limit: int = 10,
#                       db: Session = Depends(get_db)):
#     # 查询数据
#     data = curd.get_data(city=city, skip=skip, limit=limit, db=db)
#     return templates.TemplateResponse(
#         # 前端页面文件名
#         "home.html",
#         {
#             # 请求对象
#             "request": request,
#             # 查询到的数据
#             "data": data,
#             # 回调 uRL
#             "sync_data_url": "/coronavirus/sync_coronavirus_data/jhu"
#         }
#     )

# 定义后台任务
def bg_task(url: HttpUrl, db: Session):
    """这里注意一个坑,不要在后台任务的参数中db: Session = Depends(get_db)这样导入依赖"""
    # 利用 requests 请求第三方数据
    city_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=false")

    # 判断请求是否成功
    if 200 == city_data.status_code:
        # 同步数据前先清空原有的数据
        # City 表模型类
        db.query(City).delete()
        # 遍历每个城市的数据
        for location in city_data.json()["locations"]:
            # 构建数据格式
            city = {
                "province": location["province"],
                "country": location["country"],
                "country_code": "CN",
                "country_population": location["country_population"]
            }
            # 创建数据
            curd.create_city(db=db, city=schemas.CreateCity(**city))

    #  利用 requests 请求第三方数据
    coronavirus_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=true")

    # 判断请求是否成功
    if 200 == coronavirus_data.status_code:
        # Data 表模型类
        db.query(Data).delete()
        # 遍历数据
        for city in coronavirus_data.json()["locations"]:
            # 获取到城市对象
            db_city = curd.get_city_by_name(db=db, name=city["province"])
            # 存在则遍历数据
            for date, confirmed in city["timelines"]["confirmed"]["timeline"].items():
                data = {
                    "date": date.split("T")[0],  # 把'2020-12-31T00:00:00Z' 变成 ‘2020-12-31’
                    "confirmed": confirmed,
                    "deaths": city["timelines"]["deaths"]["timeline"][date],
                    "recovered": 0  # 每个城市每天有多少人痊愈,这种数据没有
                }
                # 这个city_id是city表中的主键ID,不是coronavirus_data数据里的ID
                curd.create_city_data(db=db, data=schemas.CreateData(**data), city_id=db_city.id)


@application.get("/sync_coronavirus_data/jhu")
def sync_coronavirus_data(background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
    """从Johns Hopkins University同步COVID-19数据"""
    # 添加后台任务
    background_tasks.add_task(bg_task, "https://coronavirus-tracker-api.herokuapp.com/v2/locations", db)
    # 弹出框中的提示信息
    return {"message": "正在后台同步数据..."}


# 定义根路由逻辑
@application.get("/")
def coronavirus(request: Request, city: str = None, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    # 从数据库获取到数据
    data = curd.get_data(db, city=city, skip=skip, limit=limit)
    # 渲染到页面上
    return templates.TemplateResponse("home.html", {
        "request": request,
        "data": data,
        "sync_data_url": "/coronavirus/sync_coronavirus_data/jhu"
    })

【四】测试用例 TestClient

【1】编写测试用例

  • projects\turtorial\test_chapter08.py
from fastapi.testclient import TestClient

from run import app

"""Testing 测试用例"""

client = TestClient(app)  # 先pip install pytest


def test_run_bg_task():  # 函数名用“test_”开头是 pytest 的规范。注意不是async def
    response = client.post(url="/chapter08/background_tasks?framework=FastAPI")
    assert response.status_code == 200
    assert response.json() == {"message": "任务已在后台运行"}


def test_dependency_run_bg_task():
    response = client.post(url="/chapter08/dependency/background_tasks")
    assert response.status_code == 200
    assert response.json() is None


def test_dependency_run_bg_task_q():
    response = client.post(url="/chapter08/dependency/background_tasks?q=1")
    assert response.status_code == 200
    assert response.json() == {"message": "README.md更新成功"}

【2】启动测试用例

  • 进入到测试用例文件所在的目录

  • 启动测试用例

    • pytest
    • 会提示是否成功,有哪些错误
(venv) PS E:\projects\turtorial> pytest
======================================== test session starts ======================================== 
platform win32 -- Python 3.9.13, pytest-6.2.1, py-1.10.0, pluggy-0.13.1
rootdir: E:\projects\turtorial
collected 0 items / 1 error

============================================== ERRORS ===============================================
________________________________ ERROR collecting test_chapter08.py _________________________________
test_chapter08.py:7: in <module>
    from run import app
..\run.py:32: in <module>
    app.mount(path='/static', app=StaticFiles(directory='./coronavirus/static'), name='static')      
..\..\venv\lib\site-packages\starlette\staticfiles.py:55: in __init__
    raise RuntimeError(f"Directory '{directory}' does not exist")
E   RuntimeError: Directory './coronavirus/static' does not exist
------------------------------------------ Captured stdout ------------------------------------------ 
2023-10-01 15:23:10,546 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR
(60)) AS anon_1
2023-10-01 15:23:10,546 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:10,546 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCH
AR(60)) AS anon_1
2023-10-01 15:23:10,546 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:10,547 INFO sqlalchemy.engine.base.Engine PRAGMA main.table_info("city")
2023-10-01 15:23:10,547 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:10,547 INFO sqlalchemy.engine.base.Engine PRAGMA temp.table_info("city")
2023-10-01 15:23:10,548 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:10,548 INFO sqlalchemy.engine.base.Engine PRAGMA main.table_info("data")
2023-10-01 15:23:10,548 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:10,548 INFO sqlalchemy.engine.base.Engine PRAGMA temp.table_info("data")
2023-10-01 15:23:10,548 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:10,549 INFO sqlalchemy.engine.base.Engine
CREATE TABLE city (
        id INTEGER NOT NULL,
        province VARCHAR(100) NOT NULL,
        country VARCHAR(100) NOT NULL,
        country_code VARCHAR(100) NOT NULL,
        country_population BIGINT NOT NULL,
        created_at DATETIME DEFAULT (CURRENT_TIMESTAMP),
        updated_at DATETIME DEFAULT (CURRENT_TIMESTAMP),
        PRIMARY KEY (id),
        UNIQUE (province)
)


2023-10-01 15:23:10,549 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:10,942 INFO sqlalchemy.engine.base.Engine COMMIT
2023-10-01 15:23:10,942 INFO sqlalchemy.engine.base.Engine CREATE INDEX ix_city_id ON city (id)       
2023-10-01 15:23:10,942 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:11,216 INFO sqlalchemy.engine.base.Engine COMMIT
2023-10-01 15:23:11,216 INFO sqlalchemy.engine.base.Engine
CREATE TABLE data (
        id INTEGER NOT NULL,
        city_id INTEGER,
        date DATE NOT NULL,
        confirmed BIGINT NOT NULL,
        deaths BIGINT NOT NULL,
        recovered BIGINT NOT NULL,
        created_at DATETIME DEFAULT (CURRENT_TIMESTAMP),
        updated_at DATETIME DEFAULT (CURRENT_TIMESTAMP),
        PRIMARY KEY (id),
        FOREIGN KEY(city_id) REFERENCES city (id)
)


2023-10-01 15:23:11,216 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:11,456 INFO sqlalchemy.engine.base.Engine COMMIT
2023-10-01 15:23:11,456 INFO sqlalchemy.engine.base.Engine CREATE INDEX ix_data_id ON data (id)       
2023-10-01 15:23:11,456 INFO sqlalchemy.engine.base.Engine ()
2023-10-01 15:23:11,696 INFO sqlalchemy.engine.base.Engine COMMIT
========================================= warnings summary ========================================== 
<string>:2
<string>:2
  <string>:2: SADeprecationWarning: The mapper.order_by parameter is deprecated, and will be removed i
n a future release. Use Query.order_by() to determine the ordering of a result set.

-- Docs: https://docs.pytest.org/en/stable/warnings.html
====================================== short test summary info ======================================
ERROR test_chapter08.py - RuntimeError: Directory './coronavirus/static' does not exist
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 
=================================== 2 warnings, 1 error in 2.48s ==================================== 
(venv) PS E:\projects\turtorial>