celery 分布式异步消息任务队列

发布时间 2023-10-18 20:27:17作者: 凡人半睁眼

一、介绍

中文网:Celery 初次使用 - Celery 中文手册 (celerycn.io)

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

1、celery基于python开发的分布式异步消息任务队列

# celery是什么?
	分布式异步任务框架:第三方框架,celery翻译过来是芹菜,吉祥物就是芹菜
    项目中使用异步任务的场景,可以使用它
    之前做异步,如何做? 异步发送短信---》开启多线程---》不便于管理

# celery有什么作用?
    -执行异步任务
    -执行延迟任务
    -执行定时任务

# celery原理
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
    
    
    django如果不用异步,正常运行即可,如果想做异步,就借助于 celery来完成

2、celery架构

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

 

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

Celery 组件

Celery 扮演生产者和消费者的角色

    • Producer :
      任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者。

    • Celery Beat :
      任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列。

    • Broker :
      消息代理, 队列本身. 也称为消息中间件。接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

    • Celery Worker :
      执行任务的消费者, 通常会在多台服务器运行多个消费者, 提高运行效率。

    • Result Backend :
      任务处理完成之后保存状态信息和结果, 以供查询。

二、简单使用

1、安装

pip install celery  # 最新 5.3.4

2、写一个main.py

指定了中间件、后端存储都有redis(1,2为库),任务

import time
from celery import Celery
# 1 实例化得到对象
broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存,用redis
app = Celery('app', broker=broker, backend=backend)

# 编写任务,必须用app.task 装饰,才变成了celery的任务
@app.task
def send_sms():
    time.sleep(1)
    print('短信发送成功')
    return '手机号短信发送成功'

3、提交任务,使用别的进程

add_task.py

from main import send_sms
res=send_sms.delay() 
print(res)

4、在终端中启动worker

注意:需要切换路径到main文件所在路径

windows:

pip3 install eventlet
celery -A main worker -l info -P eventlet

mac linux

celery -A main worker -l info

5、worker就会执行任务,把执行的结果,放到结果存储中

get_result.py

from celery.result import AsyncResult
from main import app
id = '92987636-ae9e-4be9-828b-8c2d10fe066a'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')