Celery周期性任务定义beat

发布时间 2023-10-04 12:11:39作者: 蕝戀

通过celery beat可以使用周期性任务的定义。

https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html

周期性任务beat相关设置:

https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-beat_schedule

您必须确保一次只运行一个beat调度程序,否则您最终会遇到重复的任务。

定义周期性任务

要定期调用任务,您必须向beat调度列表中添加一个实例条目。

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('hello') every 30 seconds.
    # It uses the same signature of previous task, an explicit name is
    # defined to avoid this task replacing the previous one defined.
    sender.add_periodic_task(30.0, test.s('hello'), name='add every 30')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

@app.task
def add(x, y):
    z = x + y
    print(z)

请注意, on_after_configure 是在应用程序设置后发送的,因此声明应用程序的模块外部的任务(例如,位于 celery.Celery.autodiscover_tasks() 的tasks.py 文件中)必须使用 on_after_finalize

add_periodic_task() 函数会在后台将实例条目添加到 beat_schedule 设置中,

同样我们还可以用设置选项的方式手动设置周期性任务:

示例:每 30 秒运行一次tasks.add 任务。

app.conf.beat_schedule = {
    'add-every-30-seconds': { # 任务名
        'task': 'tasks.add',  # 任务执行的函数名
        'schedule': 30.0,     # 调度策略,可以是整数秒数、timedelta 或 crontab 。您还可以通过扩展 schedule 的接口来定义自己的自定义计划类型。
        'args': (16, 16)      # 传递给任务函数的参数
    },
    "add-every-hour": {
         "task": "tasks.add",
         'schedule': timedelta(hours=1), # 每小时执行1次
         'args': (3, 8) # 传递参数-
     },
}

beat_schedule 的参数:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#available-fields

周期性任务调度器

Crontab schedules

https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#crontab-schedules

这种调度策略就是linux中的crontab。比较灵活。

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Solar schedules根据太阳日落日出调度

如果你有一个任务需要根据日出、日落、黎明或黄昏执行,你可以使用 solar 调度类型:

https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#solar-schedules

这个一般不怎么用.....

启动beat调度程序服务器

celery.exe -A celery_tasks.main beat -l INFO

您还可以通过启用workers -B 选项将beat嵌入到worker中,如果您永远不会运行多个worker节点,这会很方便,但它并不常用,因此不建议用于生产用途:【这玩意windows不支持!!!】

celery -A proj worker -B

Beat 需要将任务的最后运行时间存储在本地数据库文件(默认名称为 celerybeat-schedule,格式是.db)中,因此需要在当前目录中进行写入,或者您可以为此文件指定自定义位置:

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule