l、Celery

发布时间 2023-08-13 08:03:34作者: 昵称已经被使用

Celery

⼀、Celery简介

Celery 是⼀个基于python开发的异步任务队列/基于分布式消息传递的作业队列,通过它可以轻松的实现任务的异步处理。它侧重于实时操作,但对调度⽀持也很好。Celery⽤于⽣产系统每天处理数以百万计的任务。Celery是⽤Python编写的,但该协议可以在任何语⾔实现。它也可以与其他语⾔通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供⽀持Redis, Beanstalk, MongoDB,CouchDB, 和数据库(使⽤SQLAlchemy的或Django的ORM)。Celery是易于集成Django, Pylons 和 Flask,使⽤ django-celery, celery-pylons and Flask-Celery附加包即可。它的特点:

  • ⽅便查看定时任务的执⾏情况, 如 是否成功, 当前状态, 执⾏任务花费的时间等.
  • 使⽤功能⻬备的管理后台或命令⾏添加,更新,删除任务.
  • ⽅便把任务和配置管理相关联.
  • 可选多进程, Eventlet 和 Gevent 三种模型并发执⾏.
  • 提供错误处理机制.
  • 提供多种任务原语, ⽅便实现任务分组,拆分,和调⽤链.
  • ⽀持多种消息代理和存储后端.
  • Celery 是语⾔⽆关的,它提供了python 等常⻅语⾔的接⼝⽀持.

celery官⽅⽂档:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps

⼆、Celery的相关概念

celery架构图

img

  • task 就是任务,包括异步任务和定时任务
  • broker 中间⼈,接收⽣产者发来的消息即Task,将任务存⼊队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐⽤Redis或RabbitMQ实现队列服务。
  • worker 执⾏任务的单元,它实时监控消息队列,如果有任务就获取任务并执⾏它。
  • backend ⽤于存储任务的执⾏结果。Celery⽀持以不同⽅式存储任务的结果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。
  • beat 定时任务调度器,根据配置定时将任务发送给Broler。

三、应⽤场景

  • 异步调⽤:那些⽤户不关⼼的但是⼜存在在我们API⾥⾯的操作 我们就可以⽤异步调⽤的⽅式来优化(发送邮件或者上传头像)
  • 定时任务:定期去统计⽇志,数据备份,或者其他的统计任务

四、Celery的安装

  • 安装
pip install celery
pip install celery-with-redis
#django-celery-results库基于Django ORM实现了结果存储后端
pip install django-celery-results
  • 配置

在 settings.py⽂件中设置

ALLOWED_HOSTS = ['*']
INSTALLED_APPS = (
      ...
      'celery',
      'django_celery_results',  #把 django_celery_results加到INSTALLED_APPS 中
      '⾃⼰的APP'
    }

BROKER_URL='redis://localhost:6379/5'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json'              # 任务序列化和反序列化使⽤json
CELERY_RESULT_SERIALIZER = 'json'            # 结果序列化为json
  • 创建celery实例

在settings.py的同级⽬录下新建celery.py

from __future__ import absolute_import #绝对路径导⼊
from celery import Celery
from django.conf import settings
import os

#设置系统的环境配置⽤的是Django的
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "⼯程名字.settings")

#实例化celery
app = Celery('mycelery')

app.conf.timezone = "Asia/Shanghai"
    #指定celery的配置来源 ⽤的是项⽬的配置⽂件settings.py
    app.config_from_object("django.conf:settings")

#让celery ⾃动去发现我们的任务(task)
app.autodiscover_tasks() #你需要在app⽬录下 新建⼀个叫tasks.py(⼀定不要写错)⽂件

在settings.py同级⽬录下的init.py加⼊

from __future__ import absolute_import
from .celery import app as celery_app

五、Celery的使⽤

1、创建任务

在需要使⽤异步任务的APP⽬录下新建tasks.py

from celery import shared_task
import time

@shared_task
def hello_celery(loop):
    for i in range(loop):
        print('hello')
        time.sleep(2)

2、调⽤

在views.py内的调⽤

任务函数名.delay(参数,,,,)

3、⽣成数据库表

python manage.py migrate django_celery_results

4、启动worker

celery -A 你的⼯程名 worker -l info

注意:修改tasks.py的内容后 要重启celery的服务

5 、获取任务执⾏结果

异步任务执⾏完毕后,会⾃动触发信号:

  • before_task_publish
  • after_task_publish
  • task_prerun
  • task_postrun
  • task_success
  • task_failure
  • task_revoked
from celery.signals import task_success
@task_success.connect(sender=add)
def task_done_handler(sender=None,  result=None):
    print(result)

六、定时任务和计划任务

  • 定时任务

    • 启动: celery -A 你的⼯程名称 beat -l info

在settings.py⽂件添加CELERYBEAT_SCHEDULE = { 'schedule-test': { 'task': 'app的名字.tasks.hello_celery', 'schedule': timedelta(seconds=3), 'args': (2,) },}

  • 计划任务时间
#setting.py
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    "every-ten-second-run-my_task": {
        "task": "t07.tasks.my_task",
        "schedule": crontab(minute="01", hour="15"),
        "args": (2,)
    }
}
  • 坑:

    • 我们启动定时任务服务时 也要先开启worker

    ​ 如果只开启定时服务 没有开启worker服务 那么定时任务会被放⼊任务队列,但是由于没有⼲活⼉的worker 那么任务是不会被执⾏,当worker服务被启动后 会⽴刻去任务队列领任务并执⾏

  • 你的任务⼀定要确保是可以正常执⾏的

七、其它

1、查看异步任务情况

Celery提供了⼀个⼯具flower,将各个任务的执⾏情况、各个worker的健康状态进⾏监控并以可视化的⽅式展现,

  1. 安装flower:

    pip install flower 
    
  2. 启动flower(默认会启动⼀个webserver,端⼝为5555):

    celery flower --broker=redis://localhost:6379/5
    
  3. 即可查看

    http://localhost:5555

2、内存泄漏

  1. 说明

    ⻓时间运⾏Celery有可能发⽣内存泄露,可以像下⾯这样设置

  2. 示例代码

    CELERYD_MAX_TASKS_PER_CHILD = 1000 # 每个worker执⾏了多少任务就会死掉
    

常⽤配置清单

  1. 说明

  2. 配置信息

     #from kombu import Queue, Exchange
     
     # 设置Broker和backend
     BROKER_URL = 'redis://127.0.0.1:6379/0'  
     
     # 将数据存放到redis1数据库,redis默认有16个数据库
     CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
      
     CELERY_TASK_SERIALIZER = 'json'              # 任务序列化和反序列化使⽤json
     CELERY_RESULT_SERIALIZER = 'json'            # 结果序列化为json
     CELERY_ACCEPT_CONTENT = ['json']             # 分布式接受数据的类型为json
     CELERY_TIMEZONE = 'Asia/Shanghai'             #使⽤中国上海时区
     CELERY_ENABLE_UTC = True                       
     
     CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 后端存储任务超过⼀天,则⾃动清理数据,单位为秒
     CELERYD_MAX_TASKS_PER_CHILD = 1000           # 每个worker最多执⾏1000个任务就会被销毁,可防⽌内存泄露