celery

发布时间 2023-11-13 16:38:10作者: Meeeoww

介绍

# celery是什么?
	分布式异步任务框架:第三方框架,celery翻译过来是芹菜,吉祥物就是芹菜
    项目中使用异步任务的场景,可以使用它
    之前做异步,如何做? 异步发送短信---》开启多线程---》不便于管理

# celery有什么作用?
	-执行异步任务
    -执行延迟任务
    -执行定时任务
    
# celery原理
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
    
    
    django如果不用异步,正常运行即可,如果想做异步,就借助于 celery来完成
    
    
    
    
# celery架构
	-broker:消息中间件,任务中间件(消息队列:redis,rabbitmq)
    	django要做异步,提交任务到 任务中间件中(redis),存储起来
        Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
    -worker:任务执行者,任务执行单元
    	不停的从任务中间件中取任务,执行
        Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
    -backend:结果存储,任务结果存储
    	把任务执行结果(函数返回值),存放到结果存储中(redis)
        用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
     


###  任务中间件:redis
###  结果存储:redis

celery的快速使用

# 0 开源的,小组织,不支持win,不要就win的问题展开讨论了
	win上:需要借助于第三方

1 安装:
 pip install celery  # 最新 5.3.4
    
2 写代码 
* scripts\t_celery\main.py
import time
from celery import Celery
# 1 实例化得到对象
broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis,1表示第一个db库
backend = 'redis://127.0.0.1:6379/2'  # 结果存,用redis,2表示第二个db库
# broker和backend不要用同一个库,否则数据容易乱
app = Celery('app', broker=broker, backend=backend)

# 编写任务,必须用app.task 装饰才变成了celery的任务,否则就是普通函数
@app.task
def send_sms():
    time.sleep(1)
    print('短信发送成功')
    return '手机号短信发送成功'


3 提交任务,使用别的进程
* scripts\t_celery\add_task.py
from main import send_sms
res=send_sms.delay() #就能在resp中看到celery提交的任务
print(res) #cdcea241-2353-439f-8069-539147f4361a


4 执行任务
启动worker---》可以在3之前
celery -A tasks worker --loglevel=INFO
#Windows下启动Worker :
pip3 install eventlet
#终端执行命令,注意路径切换cd scripts >cd t_celery
celery -A main worker -l info -P eventlet
# main为包含app代码py文件名
# mac\linux下启动Worker :
celery -A main worker -l info


5 worker就会执行任务,把执行的结果,放到结果存储中

6 查看结果 
* scripts\t_celery\get_result.py
from celery.result import AsyncResult
from main import app
id = '92987636-ae9e-4be9-828b-8c2d10fe066a'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

** 常用命令**

# mac\linux启动Worker 
celery -A tasks worker --loglevel=INFO 

# Windows下启动Worker 
celery -A tasks worker --loglevel=INFO -P eventlet 

# 关闭Worker 
Ctrl + C,按两次关闭终端
 
#启动Beat程序 可以帮我们定时发送任务到消息队列 
celery -A tasks beat --loglevel=INFO
  • scripts\t_celery\main.py
import time
from celery import Celery
# 1 实例化得到对象
broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis,1表示第一个db库
backend = 'redis://127.0.0.1:6379/2'  # 结果存,用redis,2表示第二个db库
# broker和backend不要用同一个库,否则数据容易乱
app = Celery('app', broker=broker, backend=backend)

# 编写任务,必须用app.task 装饰才变成了celery的任务,否则就是普通函数
@app.task
def send_sms():
    time.sleep(1)
    print('短信发送成功')
    return '手机号短信发送成功'
  • scripts\t_celery\add_task.py
from main import send_sms

#1 同步执行
# res = send_sms()
# print(res)

# 2 异步发送短信
res=send_sms.delay() #返回结果不是send_sms的返回值,它是一个任务id号, 其实这个任务还没执行呢,只是提交到了任务中间件中了(redis)
print(res) #cdcea241-2353-439f-8069-539147f4361a

终端执行celery -A main worker -l info -P eventlet

  • scripts\t_celery\get_result.py
from celery.result import AsyncResult
from main import app
id = '92987636-ae9e-4be9-828b-8c2d10fe066a'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery 包结构

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果
    
  

1 创建 celery_task  包,包内部有celery.py和一堆task-->['celery_task.home_task','celery_task.user_task']
    
2 celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存,用redis
app = Celery('app', broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])

3 每个task,写自己相关的任务

4 启动worker
celery -A celery_task worker -l info -P eventlet

5 提交任务
from celery_task.home_task import add
res=add.delay(3,4)
print(res)

6 查看结果
from celery_task.celery import app
from celery.result import AsyncResult

id = 'e31441d9-e9a6-4d70-9a66-a9227a6bc273'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

  • luffy_api\celery_task\celery.py
from celery import Celery
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=Celery('app',broker=broker,backend=backend,include=['celery_task.home_task','celery_task.user_task'])
  • luffy_api\celery_task\home_task.py
import time
from .celery import app


@app.task
def add(a, b):
    time.sleep(2)
    return a + b

  • luffy_api\celery_task\user_task.py
import time
from .celery import app


@app.task
def send_sms(phone):
    time.sleep(3)
    return f'{phone}发送成功'

  • add_task.py
from celery_task.home_task import add
from celery_task.user_task import send_sms
res=add.delay(3,4)
# print(res)
send_sms.delay(13788938684)
  • get_result.py
from celery_task.celery import app
from celery.result import AsyncResult

id = 'e31441d9-e9a6-4d70-9a66-a9227a6bc273'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery 延迟任务和定时任务

异步任务

### 提交任务,使用delay提交即可
from celery_task.home_task import add
res=add.delay(3,4)
print(res)

延迟任务

## 提交延迟任务   apply_async
from datetime import datetime, timedelta

print(datetime.utcnow())  # utc 时间,跟咱们差8个小时
# eta 就是 10s 后的时间
eta = datetime.utcnow() + timedelta(seconds=30)
res = send_sms.apply_async(args=(18953675221,), eta=eta)
print(res)
"""
eta时间对象	
apply()方法是阻塞的,等待当前子进程执行完毕后,再执行下一个进程。
apply_async()是异步非阻塞式,即多个进程并行执行,提高程序的执行效率。
"""

定时任务

1 在celery.py中
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    #任务add
    'add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (5, 6),
    },
    #任务send_sms
    'send_sms': {
        'task': 'celery_task.user_task.send_sms',
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'schedule': crontab(hour=9, minute=39),  # 每天9点39,执行
        'args': (18923748221,),
    },

}



2 终端一启动worker
celery -A celery_task worker -l info -P eventlet

3 终端二启动beat(它来定时提交任务)
celery -A celery_task beat -l info
  • celery.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存,用redis
app = Celery('app', broker=broker, backend=backend, include=['celery_task.home_task', 'celery_task.user_task'])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    'add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (5, 6),
    },
    'send_sms': {
        'task': 'celery_task.user_task.send_sms',
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'schedule': crontab(hour=9, minute=39),  # 每天9点39,执行
        'args': (18923748221,),
    },

}

django中使用celery

# 通用方案
1 把咱们之前写的包,放到项目路径下

2 提交异步或延迟任务,导入直接提交即可
 	from celery_task.user_task import cache_demo
    res = cache_demo.delay('phone','1872324242')
    
3 只要启动worker,这些任务就会被执行
4 如果要使用django中的东西(配置文件,缓存,orm。。。),都需要在celery.py中写
    import os
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
    
5 使用django内置东西的任务
@app.task
def cache_demo(key, value):
    cache.set(key, value)
    return '缓存成功'
  • luffy_api\celery_task\celery.py
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=Celery('app',broker=broker,backend=backend,include=['celery_task.home_task','celery_task.user_task'])