celery的使用

发布时间 2023-07-24 15:40:55作者: F___Q

前言

官方文档
中文官方文档
基于 python 开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,但本身不提供消息服务。

Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。

  • 中间件 (Broker):接收和发送消息,通常以独立的服务形式出现。常用 Redis 或 RabbitMQ

使用

启动方式

  1. 直接运行文件 python app.py
示例
from celery import Celery

broker = ...
backend = ...
app = Celery('tasks', broker=broker, backend=backend)


if __name__ == '__main__':
    args = ['worker', '--loglevel=INFO']
    # app.worker_main(argv=args)
    app.start(args)
  1. 命令行启动
示例
# module/app.py
celery worker -A module.app -l INFO -c 4

# 后台启动
celery multi start worker -A module.app -l INFO
  • -c:进程数
  • -l:日志级别
  • 重启:celery multi restart ...
  • 停止:celery multi stop ...
  • 等待任务完成再停止:celery multi stopwait ...

基础使用

单文件任务

创建实例
import time

import celery

# 'amqp://USER:PASSWORD@HOST:PORT/QUEUE'        # RabbitMQ
# 'redis://PASSWORD@HOST:PORT/DB'               # redis
broker = 'amqp://guest:guest@127.0.0.1'
backend = 'redis://localhost:6379/1'

# 创建实例
# tasks:项目名
app = celery.Celery('tasks', backend=backend, broker=broker)


@app.task
def add(*args):
    return sum(args)


@app.task
def multi(*args):
    result = 1
    for i in args:
        result *= i
    return result


if __name__ == '__main__':
    args = ['worker', '--loglevel=INFO']
    # app.worker_main(argv=args)
    app.start(args)


生产者(发布消息)
from app import add, multi

result_add = add.delay(2, 3)		# delay()是apply_async()的快捷方法
print(f'add的任务id:{result_add.id}')

result_multi = multi.delay(2, 3)
print(f'multi的任务id:{result_multi.id}')
任务结果校验
import time

from celery.result import AsyncResult

from app import app

result_id = ''		# 填入某个任务的id
async_result = AsyncResult(id=result_id, app=app)

while True:
    if async_result.successful():
        result = async_result.get()
        print(f'结果:{result}')
        # result.forget()     # 将结果删除。执行完成后,结果不会自动删除
        # result.revoke(terminate=True)   # 无论现在是什么时候,都终止
        # result.revoke(terminate=False)   # 如果任务还没开始执行,则终止
        break
    elif async_result.failed():
        print('执行失败')
        break
    elif async_result.status == 'PENDING':
        print('等待执行')
    elif async_result.status == 'RETRY':
        print('重试中')
    elif async_result.status == 'STARTED':
        print('执行中')
    time.sleep(3)

多任务

通过 include 参数指定任务模块

创建实例
# module/app.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/1'

app = Celery(
    'celery',
    broker=broker,
    backend=backend,
    include=[
        'task01',
        'task02',
    ]
)


if __name__ == '__main__':
    args = ['worker', '--loglevel=INFO']
    app.start(args)
任务1
# module/task01.py
from app import app


@app.task
def add(*args):
    return sum(args)
任务2
# module/task02.py
from app import app


@app.task
def multi(x):
    return x * x
生产者(producer.py)
import task01, task02

result_add = task01.add.delay(3)
result_multi= task02.multi.delay(3)
print(f'add的任务id:{result_add.id}')
print(f'multi的任务id:{result_multi.id}')

定时任务

创建实例
# 参考“基础使用”
import time

import celery


broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://localhost:6379/1'

app = celery.Celery('tasks', backend=backend, broker=broker)


@app.task(ignore_result=True)     # 不需要结果
def send_email(name):
    print(f'Start send email to {name}')
    time.sleep(5)
    print(f'Over')
    return {'code': 200}
生产者
import datetime

from app import add


now = datetime.datetime.now()
send_time = now + datetime.timedelta(seconds=10)
send_time = datetime.datetime.strftime(send_time, '%Y-%m-%d %H:%M:%S')
print(f'now:{now}')
result = send_email.apply_async(args=['abc', ], eta=send_time)
print(result.id)

进阶

TODO