celery 任务

发布时间 2024-01-11 14:54:34作者: tslam

一、周期性任务

示例代码

from django.core.mail import send_mail
from celery.task.base import periodic_task
from celery.schedules import crontab
from celery.exceptions import SoftTimeLimitExceeded


# @periodic_task(run_every=crontab(minute=1, hour='0,7'))
@periodic_task(run_every=crontab(minute='*/5'))  # 设置任务的定时执行时间间隔
def celery_is_run():
    try:
        # 在这里编写你的任务逻辑
        pass
    except SoftTimeLimitExceeded as e:
        # 处理任务超时的异常
        send_email_on_failure(e)
    except Exception as e:
        # 处理其他异常
        send_email_on_failure(e)


def send_email_on_failure(exception):
    # 构造邮件内容
    subject = 'Celery任务失败'
    message = f'Celery任务执行失败:{str(exception)}'
    from_email = 'your_email@example.com'
    recipient_list = ['recipient1@example.com', 'recipient2@example.com']

    # 发送邮件
    send_mail(subject, message, from_email, recipient_list)
View Code

 

二、