celery安装和使用

发布时间 2023-07-05 15:42:45作者: 树苗叶子

安装

pip install celery

简单使用

安装完celery后,会生成一个可执行文件:celery,与pip命令在一个目录下,也就是scripts下。

注意:celery官方不支持在windows上启动,如果确需要在windows上启动服务,需要使用一个第三方模块:eventlet支持
pip install eventlet

启动命令

4.x之前版本
Linux:
celery worker -A 模块名 -l info

windows:
celery -A 模块名 worker -l info -P eventlet

4.x之后版本
Linux:
celery -A 模块名 worker -l info

windows:
celery -A 模块名 worker -l info -P eventlet

# 命令说明:
-A 指定模块名,也就是需要做分步式的模块
-l info  日志级别,info就是最普通的info级别
-P 只在windows环境下使用,linux不需要

测试

  1. 建一个celery_main.py文件
import celery
import time

# 提交的异步任务会放到broker中
broker = 'redis://192.168.0.2:6379/1'
# 执行完的结果,会被放到backend中
backend = 'redis://192.168.0.2:6379/2'

# 类实例化得到对象,第一个是指定任务名
app = celery.Celery('test', broker=broker, backend=backend)


@app.task
def add(a, b):
    # 模拟延迟
    time.sleep(5)
    return a + b
  1. 启动服务
# 先进入到celery_main.py目录下,再执行下面的启动命令
celery -A celery_main worker -l info -P eventlet
  1. 获取任务ID号
  • 创建了commit_process.py文件
# 提交任务
from celery_main import add

# 异步调用
res = add.delay(1, 2)
print(res)  # c71ca743-72d2-45d6-84f5-4b929398ad73
  1. 根据任务ID号获取执行结果
  • 创建了get_result.py文件
from celery_main import app

from celery.result import AsyncResult

id = 'fa773d73-0525-4260-8235-d0bc89d53476'
if __name__ == '__main__':
    task = AsyncResult(id=id, app=app)
    if task.successful():
        result = task.get()
        print(result)
    elif task.failed():
        print('任务失败')
    elif task.status == 'PENDING':
        print('任务等待中被执行')
    elif task.status == 'RETRY':
        print('任务异常后正在重试')
    elif task.status == 'STARTED':
        print('任务已经开始被执行')

celery包结构的使用

  • 创建一个celery的包,未来使用的时候,直接导入使用就可以了。

包架构封装结构

project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py     # 这个就是项目中的任务,以add_task为例
    └── get_result.py   # 获取结果

总结

  • 第一步,新建包celery_task,并且新建文件celery.py(必须叫这个名字)
  • celery.py
from celery import Celery

# 提交的异步任务会放到broker中
broker = 'redis://192.168.0.2:6379/1'
# 执行完的结果,会被放到backend中
backend = 'redis://192.168.0.2:6379/2'

# 类实例化得到对象,第一个是指定任务名
# include别忘了加,它是一个列表,下面的例子中的意思是celery_task/task.py
app = Celery('test', broker=broker, backend=backend, include=['celery_task.task'])
  • 第二步,在包内部,写你要写的任务内容,需要使用@app.task装饰
  • tasks.py
# 此文件下,是程序的各种任务
from .celery import app
import time


# 模拟任务
@app.task
def add(a, b):
    print('正在计算中')
    time.sleep(5)
    return a + b
  • 启动worker服务(此步骤可以放在第一步之后任意步骤,也就是建好celery.py后啥时候起都行)
cd scripts
# 注意要与celery_task目录在同级,不要进入到celery_task中执行
celery -A celery_task worker -l info -P eventlet
  • 第三步,提交任务,任务会被提交到中间件中,等待worker执行,worker启动了,就会被worker执行
  • add_task.py
from celery_task.task import add

res = add.delay(1, 2)
print(res)
  • 第四步,worker执行完,结果会放到backend中

  • 第五步,查看结果

  • get_result.py

from celery_task.celery import app

from celery.result import AsyncResult

id = '51400de7-cc62-455a-886f-43fead62a3c2'
if __name__ == '__main__':
    task = AsyncResult(id=id, app=app)
    if task.successful():
        result = task.get()
        print(result)
    elif task.failed():
        print('任务失败')
    elif task.status == 'PENDING':
        print('任务等待中被执行')
    elif task.status == 'RETRY':
        print('任务异常后正在重试')
    elif task.status == 'STARTED':
        print('任务已经开始被执行')