celery笔记

发布时间 2023-09-03 11:16:21作者: 山雨欲來風滿楼

celery介绍

1.它是什么?

  • 分布式的异步任务框架

  • 直译为: 芹菜 [ /ˈseləri ]

2.可以做什么?

  • 异步任务。(异步执行函数)

  • 延迟任务。(延迟5s任务(函数))

  • 定时任务。(例如:每天23点触发测试)[如果单纯执行定时任务,没必要用celery]

3.平台问题

  • celery is a project with minimal funding ,so we don't support Microsoft Windows. Please don't open any issues related to that platform.

4.读一下

  1. 可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)

  2. celery服务为其他醒目服务提供异步解决任务需求的。

  3. 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时,异步完成项目的需求。

  4. 例如:人是一个独立运行的服务 | 医院也是一个独立运行的服务。

5.celery的架构由三部分组成

  1. 消息中间件(message broker)

    • celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,rabbitMQ,Redis等等

  2. 任务执行单元(worker)

    • worker是celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中

  3. 任务执行结果存储(task result store)

    • Task result store 用来存储worker执行的任务结果,celery 支持以不同方式存储任务的结果,包括AMQP,redis,mysql等

如何安装?

1. pip install celery

如何使用?

方式1.普通的

  1. 第一步: 定义一个py文件(名字随意,我们这里叫celery_task)
    from celery import Celery
    backend = 'redis://127.0.0.1:6379/1'    # 执行任务的结果,存放在redis的1号仓库
    broker = 'redis://127.0.0.1:6239/2'     # 消息队列,存放在redis的2号仓库
    app = Celery(__name__,broker=broker,backend=backend)
    ​
    # 被下面这个修饰,就变成了celery的任务
    @app.task
    def add(a,b):
        return a+b

     

  2. 第二步:提交任务(新建一个py文件:submit_task)
    from celery_task import add
    # 异步调用
    # 这一步只是把任务提交到了redis中,但是没有执行。提交后会返回一个唯一标识,后期用这个唯一标识去看任务执行结果。
    res=add.delay(55,77)
    print(res)  # 我这里的唯一标识是 'ed85b97a-a231-4d71-ba11-e5f6450d0b47'
  3. 第三步:启动worker(在命令行启动)
    cd 到相关目录
    celery -A celery_task worker -l info -P eventlet
    # -A 指的要启动的文件,这里指celery_task.py(后缀名可以省略)
    # -l 日志级别是info
    # -P eventlet 在win平台需要加上
    # 注意1:如果队列有任务,他就立马执行;如果没有任务,就会阻塞在原地;除非kill 掉,否则一直在原地等地任务。
    # 注意2:根据我之前的设置,我这个worker是去消息中间件redis的2号仓库中领取任务,执行完任务,会把结果放到redis的1号仓库。
  4. 第四步:查询结果是否执行完成
    from celery_task import app
    from celery.result import AsynResult
    id = 'ed85b97a-a231-4d71-ba11-e5f6450d0b47' # 上面的唯一标识
    ​
    if __name__=='__main__':
        a = AsynResult(id=id,app=app)
        if a.sucessful():
            result=a.get()
            print("任务成功:结果为 ",result)
        elif a.failed():
            print('任务失败')
        elif a.staus =='PENDING':
            print('任务等待中被执行')
        elif a.staus =='RETRY':
            print('任务异常正在重试')
        elif a.staus =='STARTED':
            print('任务已经开始被执行')

方式2.包管理的【官方推荐】