Celery的使用

发布时间 2023-06-19 18:29:55作者: 零哭谷

celery是什么

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。


消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
另外, Celery还支持不同的并发和序列化的手段
并发:Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

使用场景

celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计

安装

pip install -U Celery
或着:
sudo easy_install Celery

基本使用

单文件夹下使用

创建celery_test文件夹,pro写生产端,tasks写消费端,result用来获取请求结果

  • 创建消费端代码 tasks.py
import celery
import time

backend = "redis://:密码@ip:port/14"
broker = "redis://:密码@ip:port/15"
cel = celery.Celery('test', backend=backend, broker=broker)

# 使用装饰器 加载任务
@cel.task()
def send_sms(name):
    print("开始发送")
    time.sleep(3)
    return "发送成功%s" % name

在tasks.py的当前目录运行celery的worker
celery -A tasks worker -l info 5版本命令和之前略有不同
出现下面信息表示成功运行

  • 创建生产端代码 Pro.py
    运行后返回一个id值
from tasks import send_sms
ret = send_sms.delay("hahaha ")
print(ret)
  • 创建异步获取结果 result.py
from tasks import cel
from celery.result import AsyncResult
# id是pro运行后获取的id值,app是生产端的实例对象
asyncresult = AsyncResult(id="4b235142-1921-4eff-9e41-27886b273a8e",app=cel)
print(asyncresult.status)
ret = asyncresult.get()
asyncresult.forget()
print(ret)

多文件夹下使用

文件结构如下, mycelery用来保存celery实例的配置

  • mycelery.py 注意此时需要用include关键字,指定异步的任务路径
import celery
backend='redis://:password@ip:port/14'
broker='redis://:password@ip:port/15'
cel=celery.Celery('test',backend=backend,broker=broker,include=[
    'celery_tasks.task01',
    'celery_tasks.task02'
                      ])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
  • task01/02.py
    写入任务函数,注意导包路径
from celery_tasks.mycelery import cel
import time

# 使用装饰器 加载任务
@cel.task()
def send_sms(name):
    print("短信开始发送")
    time.sleep(3)
    return "短信发送成功%s" % name
  • 启动worker,注意启动路径和参数里包含的相对路径celery_tasks.mycelery
    建议后续都从根路径启动workder,并用相对路径指定配置文件的位置,
    这里是在celery_test2的根路径下启动的worker
young_shi@MacBook-Air-2 celery_test2 % ls                                            
celery_tasks    pro.py          result.py
young_shi@MacBook-Air-2 celery_test2 % celery -A celery_tasks.mycelery worker -l info

  • 生产端代码
from celery_tasks.task01 import send_sms
result = send_sms.delay("yuan")
print(result.id)
result2 = send_sms.delay("alex")
print(result2.id)