fastapi项目 07-APScheduler

发布时间 2024-01-07 21:27:10作者: dack_deng

1. APScheduler

Advanced Python Scheduler(APScheduler)是一个Python库,可让Python代码稍后执行,一次或定期执行。用于调度和管理定时任务,它支持多种任务调度器,如基于日期、时间间隔和Cron表达式等。
如果您将作业存储在数据库中,那么调度程序重启后它们也将存活下来并保持其状态。当调度器重新启动时,它将运行它在离线时应该运行的所有作业。APScheduler文档https://link.zhihu.com/?target=https%3A//apscheduler.readthedocs.io/en/latest/index.html
需要进行pip安装:pip install apscheduler

1.1 基本概念

apscheduler 四个组件:

  • triggers: 任务触发器组件,提供任务触发方式
  • job stores: 任务商店组件,提供任务保存方式
  • executors: 任务调度组件,提供任务调度方式
  • schedulers: 任务调度组件,提供任务工作方式

triggers 3种触发方式

  • date:固定日期触发器,任务只运行一次
  • interval 时间间隔触发器
  • cron 定时任务触发

job stores 支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

schedulers
调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

  • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
  • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
  • AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
  • GeventScheduler: 和 gevent 框架配套使用
  • TornadoScheduler: 和 tornado 框架配套使用
  • TwistedScheduler: 和 Twisted 框架配套使用
  • QtScheduler: 开发 qt 应用的时候使用

Flask-APScheduler 中默认使用的就是 BackgroundScheduler

1.2 triggers触发器

triggers支持三种任务触发方式:

    1. date:固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建。
      使用示例:
scheduler.add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])
    1. interval 时间间隔触发器,每个一定时间间隔执行一次。
参数 说明
weeks (int) 间隔几周
days (int) 间隔几天
hours (int) 间隔几小时
minutes (int) 间隔几分钟
seconds (int) 间隔多少秒
start_date (datetime 或 str) 开始日期
end_date (datetime 或 str) 结束日期
scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')
    1. cron 定时任务触发
参数 说明
year (int 或 str) 表示四位数的年份 (2019)
month(int\ str) 月 (范围1-12) 可以是int类型,也可以是str类型
day(int\ str) 日 (范围1-31)
week(int\ str) 周 (范围1-53)
day_of_week (int\ str) 表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
hour (int\ str) 表示取值范围为0-23时
minute (int\ str) 表示取值范围为0-59分
second (int\ str) 表示取值范围为0-59秒
start_date (datetime\ str) 表示开始时间 可以是datetime类型,也可以是str类型
end_date (datetime\ str) 表示结束时间
timezone (datetime.tzinfo\ str) 表示时区取值

表示每 10 秒执行该程序一次,相当于interval 间隔调度中seconds = 10

sched.add_job(my_job, 'cron', second = '*/10')

2. 结合fastapi的APScheduler

from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import time

app = FastAPI()

scheduler = AsyncIOScheduler()
scheduler.start()

async def task3(x):
    print(f'task 3 executed  {x}--------', time.time())


async def task4(y):
    print(f'task 4 executed  {y}--------', time.time())


@app.get("/start_task3")
async def start_task3():
    scheduler.add_job(task3, id='task3', trigger='date', run_date='2024-01-07 20:55:00', args=["aaa"])
    return {"msg": "任务启动成功"}


@app.get("/start_task4")
async def start_task4():
    scheduler.add_job(task4, id='task4',trigger='date', run_date='2024-01-07 20:55:00', args=["bbb"])
    return {"msg": "任务启动成功"}


@app.get("/task_list")
async def task_list():
    jobs = scheduler.get_jobs()  # 获取全部的jobs
    jobs_info = []
    for job in jobs:
        info = {}
        info['id'] = job.id
        info['next_run_time'] = job.next_run_time
        jobs_info.append(info)
    return jobs_info

@app.get("/task_pause")
async def task_pause(task_id: str):
    job = scheduler.get_job(task_id)
    if job:
        job.pause()
        return {"msg": "task id 已暂停"}
    else:
        return {"msg": "task id 不存在"}


@app.get("/task_delete")
async def task_del(task_id: str):
    job = scheduler.get_job(task_id)
    if job:
        job.remove()
        return {"msg": "task id 已删除"}
    else:
        return {"msg": "task id 不存在"}


@app.get("/task_resume")
async def task_resume(task_id: str):
    job = scheduler.get_job(task_id)
    if job:
        job.resume()
        return {"msg": "task id 已恢复"}
    else:
        return {"msg": "task id 不存在"}
    
if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        app='main:app',
        host="127.0.0.1",
        port=8020, reload=True
    )