celery简介与安装

发布时间 2023-09-04 14:24:43作者: 7dao

前言

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。

可以使用的场景如

  • 异步发邮件,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
  • 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
  • 定时调度任务等

 

Celery 简介

 

Celery 扮演生产者和消费者的角色,先了解一下什么是生产者消费者模式。

 

该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

 

接下来需要弄清楚几个问题,谁生产数据(Task),谁是中间件(Broker),谁来消费数据(Worker),消费完之后运行结果(backend)在哪?

 

 

 

celery 5个角色

 

  • Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
  • Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是WorkerCelery 本身不提供队列服务,推荐用RedisRabbitMQ实现队列服务。
  • Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • Beat 定时任务调度器,根据配置定时将任务发送给Broker
  • Backend 用于存储任务的执行结果。

 如下图所示

 

 celery的demo

  安装命令: pip install celery

  

from celery import Celery
# import  redis
# 创建一个Celery应用程序
#redis 设置了密码
app = Celery('mycelery', backend='redis://:000222@150.158.47.37', broker='redis://:000222@150.158.47.37')


#redis未设置密码
# app = Celery('my_app', backend='redis://150.158.47.37', broker='redis://150.158.47.37')

#@app.task 将函数转为celery任务
#@shared_task:这个装饰器与@task类似,但是它创建的任务是共享的,可以在多个应用程序中使用
@app.task def add_numbers(x, y): return x + y if __name__ == '__main__':
  #delay 触发任务 res = add_numbers.delay(3, 5) print(res) print(res.get()) # 等待结果并将其打印出来
  

  task任务装饰器:

  1. @task:这是最基本的任务装饰器,用于将一个函数转换为Celery任务。它可以带有参数来配置任务的行为。

  2. @shared_task:这个装饰器与@task类似,但是它创建的任务是共享的,可以在多个应用程序中使用。

  3. @task(bind=True):当您需要访问任务实例本身时,可以使用此装饰器。通过指定bind=True参数,任务函数的第一个参数将变成任务实例自身。

  4. @task(name='custom_name'):通过name参数,您可以为任务指定一个自定义的名字,而不是默认的函数名。

  5. @task(queue='custom_queue'):使用queue参数可以将任务分配给特定的队列,而不是默认队列。

常用的几个属性

  • res.task_id 任务id唯一的,可以根据id拿到结果
  • res.status 任务状态:PENDING、STARTED、RETRY、FAILURE、SUCCESS
  • res.get() 任务运行结果,必须要任务状态是'SUCCESS',才会有运行结果
  • r.successful() 返回布尔值,执行成功返回True
  • r.ready() # 返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
  • r.wait() # 等待任务完成, 返回任务执行结果.
  • r.result # 任务执行结果.
  • r.state # 和res.status一样,任务状态:PENDING, START, SUCCESS