APScheduler定时任务框架

发布时间 2023-08-30 11:40:52作者: 星空看海

前言

在日常工作中,如果想要简化工作流程实现办公自动化,那么几乎有大半的功能模块都需要使用定时任务,例如定时收发邮件,或者定时发微信或是检测垃圾邮件等等,而在python中常用实现定时任务的包含以下四种方法:

  • while True: + sleep()
  • threading.Timer定时器
  • 调度模块schedule
  • 任务框架APScheduler
    但在实际测试中,可以发现:

循环+sleep方式可以用来做简单测试。
timer可以实现异步定时任务。
schedule可以定点定时执行,但是仍然需要while Ture配合,而且占用内存大。
https://blog.csdn.net/weixin_44799217/article/details/127352957
APScheduler框架更加强大,可以直接在里面添加定点与定时任务,无可挑剔!!!

一、APscheduler简介

APscheduler全称Advanced Python Scheduler,作用为在指定的时间规则执行指定的作业,其是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。

二、APscheduler安装

我们通过Anaconda管理虚拟环境,并进行APscheduler库的安装。

pip install apscheduler

三、APscheduler组成部分

# 触发器(trigger):
包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。

# 作业存储(job store):
存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。

# 执行器(executor):
处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

# 调度器(scheduler):
其他的组成部分。通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。 

3.1 Job 作业

作用:

Job作为APScheduler最小执行单位。
创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。

构建说明:

id:指定作业的唯一ID
name:指定作业的名字
trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的
时间, 满足时将会执行
executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此
job的 执行器,执行job指定的函数
max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数
来确定是否可执行
next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触
发时间
misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致
21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行
一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
func:Job执行的函数
args:Job执行函数需要的位置参数
kwargs:Job执行函数需要的关键字参数

3.2 Trigger 触发器

包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了它们自己初始配置以外,触发器完全是无状态的。

APScheduler 有三种内建的 trigger:

  • date: 特定的时间点触发
  • interval: 固定时间间隔触发
  • cron: 在特定时间周期性地触发
    触发器就是根据你指定的触发方式,比如是按照时间间隔,还是按照 crontab触发,触发条件是什么等。每个任务都有自己的触发器。

3.3 Jobstore 任务存储器

任务存储器的选择有两种。一是内存,也是默认的配置。二是数据库。使用内存的方式是简单高效,但是不好的是,一旦程序出现问题,重新运行的话,会把之前已经执行了的任务重新执行一遍。数据库则可以在程序崩溃后,重新运行可以从之前中断的地方恢复正常运行。有以下几种选择:

  • MemoryJobStore:没有序列化,任务存储在内存中,增删改查都是在内存中完成。
  • SQLAlchemyJobStore:使用 SQLAlchemy这个 ORM框架作为存储方式。
  • MongoDBJobStore:使用 mongodb作为存储器。
  • RedisJobStore:使用 redis作为存储器。

任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。

3.4 Executor 执行器

Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。
每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor。找到实际的执行器对象,然后根据执行器对象执行Job。

执行器的选择取决于应用场景。通常默认的 ThreadPoolExecutor已经在大部分情况下是可以满足我们需求的。如果我们的任务涉及到一些 CPU密集计算的操作。那么应该考虑 ProcessPoolExecutor。然后针对每种程序, apscheduler也设置了不同的 executor:

  • ThreadPoolExecutor:线程池执行器。
  • ProcessPoolExecutor:进程池执行器。
  • GeventExecutor: Gevent程序执行器。
  • TornadoExecutor: Tornado程序执行器。
  • TwistedExecutor: Twisted程序执行器。
  • AsyncIOExecutor: asyncio程序执行器。

处理作业的运行,它们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

3.5 scheduler 调度器

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start函数会阻塞当前线程,不能立即返回。

  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start后主线程不会阻塞。
  • AsyncIOScheduler:适用于使用了 asyncio模块的应用程序。
  • GeventScheduler:适用于使用 gevent模块的应用程序。
  • TwistedScheduler:适用于构建 Twisted的应用程序。
  • QtScheduler:适用于构建 Qt的应用程序。

任务调度器是属于整个调度的总指挥官。它会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。开发人员很少直接操作触发器、存储器、执行器等。因为这些都由调度器自动来实现了。

四、Scheduler工作流程图

4.1 Scheduler添加job流程

  • 1 添加任务:使用 scheduler.add_job(job_obj,args,id,trigger,**trigger_kwargs)。
  • 2 删除任务:使用 scheduler.remove_job(job_id,jobstore=None)。
  • 3 暂停任务:使用 scheduler.pause_job(job_id,jobstore=None)。
  • 4 恢复任务:使用 scheduler.resume_job(job_id,jobstore=None)。
  • 5 修改某个任务属性信息:使用 scheduler.modify_job(job_id,jobstore=None,**changes)。
  • 6 修改单个作业的触发器并更新下次运行时间:使用 scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args)
  • 7 输出作业信息:使用 scheduler.print_jobs(jobstore=None,out=sys.stdout)

4.2 Scheduler调度流程

五、APscheduler使用

5.1 简单应用

import time
from apscheduler.schedulers.blocking import BlockingScheduler
 
def my_job(i):
    print (i)
 
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5,values=['学会了'])
sched.start()

输出:

5.2 操作作业

5.2.1 date触发器

date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:

参数 说明
run_date(datetime or str) 任务运行的日期或者时间
timezone(datetime.tzinfo or str) 指定时区
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
 
scheduler = BlockingScheduler()
 
def my_job(text):
    print(text)
    
# datetime类型(用于精确时间)
scheduler.add_job(my_job, 'date', run_date=datetime(2022, 4, 25, 17, 30, 5), args=['测试任务'])
 
scheduler.start()

注意:run_date参数可以是date类型、datetime类型或文本类型。

5.2.2 interval触发器

固定时间间隔触发。interval 间隔调度,参数如下:

参数 说明
weeks(int) 间隔几周
days(int) 间隔几天
hours(int) 间隔几小时
minutes(int) 间隔几分钟
seconds(int) 间隔多少秒
start_date(datetime or str) 开始日期
end_date(datetime or str) 结束日期
timezone(datetime.tzinfo or str) 时区
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
 
def job_func():
     print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")
 
scheduler = BlockingScheduler()
 
# 每2小时触发
scheduler.add_job(job_func, 'interval', hours=2)
 
# 在 2019-04-15 17:00:00 ~ 2019-12-31 24:00:00 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2022-04-29 17:00:00' , end_date='2022-12-31 24:00:00')
 
scheduler.start()

jitter振动参数,给每次触发添加一个随机浮动秒数,一般适用于多服务器,避免同时运行造成服务拥堵。

# 每小时(上下浮动120秒区间内)运行`job_function`
scheduler.add_job(job_func, 'interval', hours=1, jitter=120)

5.2.3 cron触发器

在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。

cron 参数:

参数 说明
year(int or str) 年,4位数字
month(int or str) 月(范围1-12)
day(int or str) 日(范围1-31)
week(int or str) 周(范围1-53)
day_of_week(int or str) 周内第几天或者星期几(范围0-6或者mon,tue,wed,thu,fri,stat,sun)
hour(int or str) 时(0-23)
minute(int or str) 分(0-59)
second(int or str) 秒(0-59)
start_date(datetime or str) 最早开始日期(含)
end_date(datetime or str) 最晚结束日期(含)
timezone(datetime.tzinfo or str) 指定时区

表达式类型

表达式 参数类型 描述
* 所有 通配符。例:minutes=*即每分钟触发
*/a 所有 可被a整除的通配符。
a-b 所有 范围a-b触发
a-b/c 所有 范围a-b,且可被c整除时触发
xth y 第几个星期几触发。x为第几个,y为星期几
last x 一个月中,最后的星期几触发
last 一个月最后一天触发
x,y,z 所有 组合表达式,可以组合确定值或上方的表达式

示例

import datetime
from apscheduler.schedulers.background import BackgroundScheduler
 
def job_func(text):
    print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
 
scheduler = BackgroundScheduler()
# 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务
scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
 
scheduler.start()