django-Celery 定时任务保存到数据库

发布时间 2023-09-06 10:38:10作者: 7dao

1.安装相关模块

  pip install django-celery-beat

  pip install django-celery-results

2.Setting配置修改

  INSTALLED_APPS = [

    ***
      'django_celery_beat',
      'django_celery_results',
   ]

  #配置定时任务连接数据库
  CELERY_RESULT_BACKEND = 'django-db'
  #配置定时任务存储数据库中
  CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

 

  运行同步数据库

  python manage.py migrate

  登录后台即可配置任务

  后台自动会把tasks.py文件里的函数同步过来

  任务添加:

  

  任务结果

  

 自定义接口创建定时任务

 1.创建一个 Django 视图(view)来处理前端传递的定时任务数据。在你的应用程序中的 views.py 文件中添加以下代码:

from django.http import JsonResponse
from django_celery_beat.models import PeriodicTask, IntervalSchedule

def create_periodic_task(request):
    # 处理前端传递的定时任务数据
    task_name = request.POST.get('task_name')
    task_interval = request.POST.get('task_interval')
    
    # 创建定时任务间隔对象
    interval, _ = IntervalSchedule.objects.get_or_create(every=task_interval, period=IntervalSchedule.SECONDS)
    
    # 创建新的定时任务
    periodic_task = PeriodicTask.objects.create(
        name=task_name,
        interval=interval,
        task='mycelery.tasks.my_custom_task',  # 替换为你的 Celery 任务函数路径
    )

    # 返回成功响应给前端
    return JsonResponse({'status': 'success', 'task_id': periodic_task.id})

2. 在你的应用程序中的 urls.py 文件中,创建一个 URL 映射将该视图和相应的 URL 关联起来。例如:

from django.urls import path
from mycelery.views import create_periodic_task

app_name = 'mycelery'

urlpatterns = [
    path('create-periodic-task/', create_periodic_task, name='create_periodic_task'),
]

3. 在前端页面中,使用 AJAX 或表单提交等方式调用该接口来发送定时任务数据。例如,使用 jQuery 的 AJAX 方法:

$.ajax({
  url: '/create-periodic-task/',
  method: 'POST',
  data: {
    task_name: 'Your task name here',
    task_interval: 5,  // 确定你的任务时间间隔
  },
  success: function(response) {
    // 成功处理响应
    console.log(response);
  },
  error: function(error) {
    // 处理错误
    console.log(error);
  }
});

  通过postman调用接口验证

  

  后台查看,添加成功

   

 

前端实现逻辑:

实际的任务函数名称在tasks中并没有,所以前端真正创建的时候 可以先创建task函数,然后任务名称是从task函数中筛选出来的。