celery异步任务、延迟任务、定时任务

发布时间 2023-07-06 09:18:10作者: 树苗叶子

异步任务

# 比如有个函数add,需要传入两个参数,使用delay就是异步任务
# 配置脚本详见:https://www.cnblogs.com/smyz/p/17525174.html
res = add.delay(1, 2)

延迟任务

@app.task
def add(a, b):
    print('正在计算中')
    time.sleep(5)
    return a + b

# 需要传入一个时间对象
from datetime import datetime, timedelta
# 拿到utc的当前时间datetime.utcnow() 因为我的环境是utc时间
# timedelta是生成的时间类型的值可以与时间进行运算,直接运算是不行的
eta = datetime.utcnow() + timedelta(seconds=10)

# 10秒之后执行这个任务
res = add.apply_async(args=[这里面是add函数的参数], eta=时间参数)
# 例如:
res = add.apply_async(args=['add'], eta=eta)
  • 修改时区方法
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

定时任务

  • 定时任务需要启动beat和worker
  • beat: 定时提交任务的进程
  • 启动方法:
cd scripts
celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet
  • 内容
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置一:每隔多久执行一次
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'my_task': {  # 这个名字可以随便写
        'task': 'celery_task.tasks.add',  # 这里是任务名字(需要写全路径)
        'schedule': timedelta(seconds=3),  # 传入一个时间对象,意思是每三秒执行一下
        'args': (300, 150),  # 要传入的参数
    },
    'my_task2':{...},
    'my_task3':{...}
}

# 任务的定时配置二:与linux crontab相同,写好什么时候执行
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'my_task': {  # 这个名字可以随便写
        'task': 'celery_task.tasks.add',  # 这里是任务名字(需要写全路径)
        'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点,需要导入crontab包
        'args': (300, 150),  # 要传入的参数
    },
    'my_task2':{...},
    'my_task3':{...}
}