django-celery定时任务(beat)

发布时间 2023-09-05 13:59:51作者: 7dao

前言

  Celery 可以异步执行,也可以通过定时任务触发

Django 中使用 Celery

  要在 Django 项目中使用 Celery,您必须首先定义 Celery 库的一个实例(称为“应用程序”)

  如果你有一个现代的 Django 项目布局,比如:

  

   创建一个celery模块,来定义celery实例

  

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mycelery.settings')

app = Celery('mycelery')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

  mycelery是自己django项目中的app名称

  然后你需要在你的mycelery/mycelery/__init__.py 模块中导入这个应用程序。这确保在 Django 启动时加载应用程序,以便 @shared_task 装饰器(稍后提到)将使用它:

from .celery import app as celery_app

__all__ = ('celery_app',)

tasks任务

 在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

from __future__ import absolute_import
from celery import shared_task


@shared_task
def add(x, y):
    print("task----------111111----------")
    return x + y


@shared_task
def mul(x, y):
    print("task----------22222----------")
    return x * y

添加setting配置

INSTALLED_APPS = [
    ***
    'mycelery',
]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

#  celery 配置连接redis
CELERY_BROKER_URL = 'redis://150.158.47.37:6379'
CELERY_RESULT_BACKEND = 'redis://150.158.47.37:6379'


# 配置定时任务
from celery.schedules import crontab
from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
    'add': {
        'task': 'mycelery.tasks.add',  # 任务
        'schedule': timedelta(seconds=5),  # 每5秒执行add函数
        'args': (11, 12)  # 运行参数
    },
    'mul': {
        'task': 'mycelery.tasks.mul',  # 任务
        'schedule': timedelta(seconds=10),  # 每10秒执行mul函数
        'args': (11, 2)  # 运行参数
    }
}

  task:对应的路径地址是app下tasks的路径和对应的函数名称

启动worker 和beat服务

  启动worker,执行任务

  celery -A mycelery worker -l info -P eventlet

  

   启动beat 定时任务监听

  celery -A mycelery(django 项目名称) beat -l info

  

 

crontab 周期任务

前面是设置每多少秒执行任务,这个只是测试下功能,任务很简单,我们一般用crontab 实现周期性任务,比如每周1-5早上执行一遍任务,用crontab 可以轻松实现

# crontab任务
# 每周一8:30调用task.add
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 8:30 A.M
    'add': {
        'task': 'yoyo(django app名称).tasks.add',  # 任务
        'schedule': crontab(hour=8, minute=30, day_of_week=1),
        'args': (11, 12)  # 运行参数
    }
}