29-Celery基本配置

发布时间 2023-03-28 18:45:45作者: 测试圈的彭于晏
# Celery 是一个基于python开发的异步任务队列/基于分布式消息传递的作业队列,通过它可以很轻松的实现任务的异步处理
# 官方网站:
    https://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html

一. celery的相关概念

celery架构图

# 1. task 就是任务,包括异步任务和定时任务

# 2. broker 中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。
   # Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务

# 3. worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它

# 4. backend 用于存储任务的执行结果。Celery支持以不同方式存储任务的结果,
   # 包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, lronCache等 

# 5. beat 定时任务调度器,根据配置定时将任务发送给Broler。

二. 应用场景

# 异步调用: 那些用户不关心的但是又存在在我们API里面的操作 我们就可以用异步调用的方式来优化(发送邮件或者上传头像)

# 定时任务:定期去统计日志,数据备份,或者其他的统计任务

三. Celery的安装

# 1. 安装
    pip install celery==4.4.1
    pip install celery-with-redis==3.0
    pip install django-celery-results==1.2.0  #django-celery-results库基于 Django ORM实现了结果存储后端
    pip install django-celery==3.3.1
# 2. 配置settings.py
  #注册celery
    INSTALLED_APPS = [
         ...
        'celery',
        'django_celery_results'
    ]

  # celery配置
    BROKER_URL = 'redis://:root@192.168.00.000:6379/5' # @前面是redis密码,密码前必须加 : 
    # BROKER_URL = 'redis://localhost:6379/5' # 无密码
    CELERY_RESULT_BACKEND = 'django-db'
    CELERY_TASK_SERIALIZER = 'json'  # 任务序列化和反序列化使用json
    CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化为json
# 3. 配置完后,进行数据库迁移(生成:django_celery_results_taskresult 表)
    python manage.py migrate
# 4. 在settings.py同级目录下新建celery.py : 创建celery实例
    from __future__ import absolute_import  # 绝对路径导入
    from celery import Celery
    from django.conf import settings
    import os

    # 设置工程配置文件
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'new.settings')  # new.settings: 主应用名.settings

    # 实例化celery对象
    # 第一个参数是应用名称,必须给
    app = Celery('mycelery')

    # 设置时区
    app.conf.timezone = 'Asia/Shanghai'

    # 读取配置文件
    app.config_from_object('django.conf:settings')

    # 自动发现任务
    app.autodiscover_tasks()  # 需要在app目录下,新建一个叫tasks.py(一点不要写错)文件
# 5. 在settings.py同级目录下的__init__.py加入
    from __future__ import absolute_import
    from .celery import app as celery_app # 重命名
# 6. 在app目录下,新建一个叫tasks.py(一点不要写错)文件

四. Celery 的使用

# 1. 创建任务 : 在需要使用异步任务的APP目录下新建tasks.py
    from celery.signals import task_success
    from celery import shared_task
    # 任务: 可执行的函数 / 类
    # 使用装饰器装饰起来的就是任务
    @shared_task
    def hello_world(n):
        for i in range(n):
            print("Hello World")
# 2. 创建路径urls.py
      # celery 测试
      path('task/', views.exec_task, name="task"),
# 3. 视图函数执行任务
    def exec_task(request):
        from App02.tasks import hello_world
        # 把任务添加到任务队列
        hello_world.delay(4)  # 任务函数名.delay(参数)
        return HttpResponse("celery")
# 4. 执行数据库迁移指令(生成数据库表)-忽略,之前已生成
      python manage.py migrate django_celery_results
# 5. 启动worker(异步任务)
    # 方案一: 使用--pool参数
      celery -A 主应用名(new) worker -l info --pool=solo
    
    # 方案二: 指定 gevent
      pip install gevent
      celery -A app_name worker -l info -P gevent

    # 方案三: 指定 eventlet
      pip install eventlet
      celery -A app_name worker -l info -P eventlet  -c 10