celery 之 celery介绍架构和安装、celery执行异步任务、包结构celery、celery执行延迟任务和定时任务、django中使用celery、接口缓存

发布时间 2023-06-29 19:47:52作者: 岳宗柯

目录

一、celery介绍架构和安装

1、celery :分布式的异步任务框架,主要用来做:

- 异步任务
- 延迟任务
- 定时任务---》如果只想做定时任务,可以不使用celery,有别的选择

2、celery 框架,原理

1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

3、celery架构

消息中间件(broker):消息队列:可以使用redis,rabbitmq,咱们使用redis
任务执行单元(worker):真正的执行 提交的任务
任务执行结果存储(banckend):可以使用mysql,redis,咱们使用redis

4、安装celery

    -pip install Celery
    -释放出可执行文件:celery,由于 python解释器的script文件夹再环境变量,任意路径下执行celery都能找到

5、celery不支持win,所以想再win上运行,需要额外安装eventlet

windows系统需要eventlet支持:pip3 install eventlet
Linux与MacOS直接执行:
    3.x,4.x版本:celery worker -A demo -l info
    5.x版本:     celery -A demo worker -l info -P eventlet

二、celery执行异步任务

基本使用

1 再虚拟环境中装celery和eventlet

2 写个py文件,实例化得到app对象,注册任务

    from celery import Celery
    import time
    broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
    backend = 'redis://127.0.0.1:6379/2'  # 结果存储 redis
    app = Celery(__name__, broker=broker, backend=backend)
    @app.task # 变成celery的任务了
    def add(a, b):
        print('运算结果是',a + b)
        time.sleep(1)
        return a + b

3 启动worker(worker监听消息队列,等待别人提交任务,如果没有就卡再这)

       # 路径要切换到启动文件包的路径
    celery -A demo worker -l info -P eventlet

image

4 别人 提交任务,提交完成会返回一个id号,后期使用id号查询,至于这个任务有没有被执行,取决于worker有没有启动

	from demo import add
	res=add.delay(77,66)

image

image

5 提交任务的人,再查看结果(固定写法)

from demo import app
    # celery的包下
    from celery.result import AsyncResult
	# 这里注意改变查找结果的id号
    id = '042a8fc1-6b0f-4ad6-bf72-edefa657a52f'
    if __name__ == '__main__':
        a = AsyncResult(id=id, app=app)
        if a.successful():  # 正常执行完成
            result = a.get()  # 任务返回的结果
            print(result)
        elif a.failed():
            print('任务失败')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
        elif a.status == 'RETRY':
            print('任务异常后正在重试')
        elif a.status == 'STARTED':
            print('任务已经开始被执行')

秒杀场景
image

三、包结构celery

使用步骤:

1 新建包:celery_task

2 再包下新建 celery.py 必须叫它,里面实例化得到app对象

from celery import Celery
broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存储 redis
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.course_task','celery_task.home_task','celery_task.user_task'])

3 新建任务py文件:user_task.py course_task.py home_task.py

	-以后跟谁相关的任务,就写在谁里面
    from .celery import app
    import time
    @app.task
    def send_sms(mobile, code):
        time.sleep(2)
        print('%s手机号,发送短信成功,验证码是:%s' % (mobile, code))
        return True

4 启动worker,以包启动,来到包所在路径下

    celery -A 包名 worker -l info -P eventlet
    celery -A celery_task worker -l info -P eventlet

image

5 其它程序,导入任务,提交任务即可

    from celery_task.user_task import send_sms
    res = send_sms.delay(1999999333, 8888)
    print(res)  # f33ba3c5-9b78-467a-94d6-17b9074e8533

6 其它程序,查询结果

from celery_task.celery import app
# celery的包下
from celery.result import AsyncResult

id = '51a669a3-c96c-4f8c-a5fc-e1b8e2189ed0'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 正常执行完成
        result = a.get()  # 任务返回的结果
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

四、 celery执行延迟任务和定时任务

celery 可以做

-异步任务
-延迟任务---》延迟多长时间干任务
-定时任务:每天12点钟,每隔几秒。。。
	-如果只做定时任务,不需要使用celery这么重,apscheduler(自己去研究)

1、异步任务

-导入异步任务的函数
-函数.delay(参数)

2、延迟任务

-导入异步任务的函数
-函数.apply_async(kwargs={'mobile':'1896334234','code':8888},eta=时间对象)
# send_sms.apply_async(args=['15266783547',8888])
from datetime import datetime, timedelta
# eta=datetime.utcnow() + timedelta(seconds=10)
# print(datetime.now())
# print(type(datetime.utcnow()))  # datetime.datetime   # 咱们写延迟任务要用utc时间,没有做时间国际化
# print(timedelta(minutes=3,seconds=2)) #
# print(type(timedelta(hours=1))) #它可以跟datetime.datetime 做相加

eta=datetime.utcnow() + timedelta(seconds=10)

res=send_sms.apply_async(kwargs={'mobile':'1896334234','code':8888},eta=eta)
print(res)

image

3.定时任务:在app所在的文件下配置

- 1 配置(在celery文件中)

### 3 定时任务要写在这里
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    'send_sms': {
        'task': 'celery_task.user_task.send_sms',
        'schedule': timedelta(seconds=5),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('1822344343', 8888),
    },
    'add_course': {
        'task': 'celery_task.course_task.add_course',
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'schedule': crontab(hour=11, minute=38),  # 每天11点35,执行
        'args': (),
    }
}

-2 启动beat,启动worker

 celery -A celery_task worker -l info -P eventlet

image

celery  -A celery_task beat -l info 

image

-3 到了时间,beat进程负责提交任务到消息队列---》worker执行

五、django中使用celery

使用步骤

1 把之前写好的包,copy到项目根路径下

2 在xx_task.py 中写任务

from .celery import app
    @app.task
    def add_banner():
        from home.models import Banner
        Banner.objects.create(title='测试', image='/1.png', link='/banner', info='xxx',orders=99)
        return 'banner增加成功'

3 在celery.py 中加载django配置

# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

4 视图函数中,导入任务,提交即可

class CeleryView(APIView):
     def get(self, request):
         res = add_banner.delay()
         return APIResponse(msg='新增banner的任务已经提交了')

5 启动worker,等待运行即可

六、接口缓存

所有接口都可以改造,尤其是查询所有的这种接口,如果加入缓存,会极大的提高查询速度

首页轮播图接口:获取轮播图数据,加缓存---》咱们只是以它为例


class BannerView(GenericViewSet, ListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        '''
          1 先去缓存中查一下有没有数据
          2 如果有,直接返回,不走父类的list了(list在走数据库)
          3 如果没有,走父类list,查询数据库
          4 把返回的数据,放到缓存中
        '''

        data = cache.get('home_banner_list')
        if not data:  # 缓存中没有
            print('走了数据库')
            res = super().list(request, *args, **kwargs)  # 查询数据库
            # 返回的数据,放到缓存中
            data = res.data.get('data')  # {code:100,msg:成功,data:[{},{}]}
            cache.set('home_banner_list', data)
        return APIResponse(data=data)


    
# 公司里可能会这么写
	-写一个查询所有带缓存的基类
    -写个装饰器,只要一配置,就自动带缓存
    
    
# 双写一致性问题:缓存数据和数据库数据不一致了
	-写入数据库,删除缓存
    -写入数据库,更新缓存
    -定时更新缓存