celery-介绍和安装

发布时间 2023-04-24 21:04:37作者: ERROR404Notfound

1.celery介绍

Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

celery是python一个框架,与django无关,可以用在django中,也能用在flask中,运行起来就是一个服务。
它的功能:
1.异步任务
2.定时任务
3.延迟任务

celery的运行原理:
1.可以不依赖任何服务器,通过自身命令启动服务
2.celery服务为其他项目提供异步解决任务需求,比如有两个服务同时运行,一个是服务项目,一个是celery服务,项目服务有需求,celery就会在需要的时候异步完成项目的需求

"""
一个很形象的例子:
人是一个独立运行的服务,医院也是一个独立运行的服务。正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题。
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""
image
celery由以下几部分构成(broker和backend都使用redis):
1.任务中间件(broker),其他服务提交的异步任务,放在里面排队。Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等。
2.执行任务单元(worker)
3.结果存储(backend),对应到函数中就是函数的返回结果存储到backend中。Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
django就相当于生产者,worker就相当于消费者

使用场景:
1.异步执行:解决耗时任务
2.延迟执行:解决延迟任务
3.定时执行:解决周期任务

2.celery快速使用、异步任务

安装:

pip install celery

安装完成会有一个可执行文件:celery.exe
image

我们首先演示一个同步调用的效果,同步调用因为有time.sleep,所以需要等三秒之后才能拿到结果:
main.py:

from celery import Celery

'''提交的异步任务放在redis中的第1个库'''
broker = 'redis://127.0.0.1:6379/1'
'''执行结果放在redis中的第2个库'''
backend = 'redis://127.0.0.1:6379/2'

app = Celery('test', broker=broker, backend=backend)

@app.task
def add(a, b):
    import time
    time.sleep(3)
    return a + b

s1.py:

from main import add

res = add(2,3)
print(res)  # 5

接下来我们演示异步操作:
1.main.py和之前代码没有变过:

from celery import Celery

'''提交的异步任务放在redis中的第1个库'''
broker = 'redis://127.0.0.1:6379/1'
'''执行结果放在redis中的第2个库'''
backend = 'redis://127.0.0.1:6379/2'

app = Celery('test', broker=broker, backend=backend)

@app.task
def add(a, b):
    import time
    time.sleep(3)
    return a + b

2.在s1.py中需要将代码修改为异步执行,然后执行该代码,就是将该任务提交到broker:
s1.py:

from main import add

res = add.delay(5,6)
print(res)

执行完毕之后命令行会弹出一个随机字符串:
image
3.启动work之前win系统需要安装一个模块(linux和mac不需要):

pip install eventlet

4.启动worker:将路径切到main.py所在的目录下(D:\上海python金牌班\20230306 路飞\day08\luffy\luffy_api\scripts\celery_t),执行操作(启动worker操作只需要做一次就好,之后可以不用做):

	win:
       celery4.x之前版本
		celery worker -A main -l info -P eventlet
       celery4.x之后
    	celery  -A main  worker -l info -P eventlet
	mac:
       celery  -A main  worker -l info

然后查看backend指定的数据库查看,可以看到执行结果在result中:
image
但是我们查看不应该在redis中查看,而是在pycharm中查看,这是我们再建一个py文件s2.py:
s2.py

from main import app
from celery.result import AsyncResult
id = 'aa6dc05c-1108-48de-8a8d-0870eff9c25b'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

执行这个文件就可以在终端看到执行结果:
image

3.celery包结构

以上代码可以执行任务,但是我们象吧celery做成一个包结构,在需要的时候直接导入这个包就可以。我们首先需要建立这样的目录结构:
project
├── celery_task # celery包
│ ├── init.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数(之前写在main中的任务)
├── add_task.py # 添加任务
└── get_result.py # 获取结果
我们需要按照步骤执行一下操作:
1.首先需要新建一个包:celery_task,在该包下新建celery.py,该文件下生成了一个Celery对象app,app内需要指定broker和backend。
celery_task/celery.py:

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test',broker=broker,backend=backend,include=['celery_task.order_task','celery_task.user_task'])
'''导入需要从包这一层开始导(scripts下)'''

2.之前在main.py中我们生成了Celery对象并且写了方法。但是由于实际业务中方法较多,我们需要分类写在不同的py文件中(也就是目录结构中的task.py):
celery_task/user_task.py:

from .celery import app
import time

@app.task
def send_msg(phone,code):
    '''模拟发送短信'''
    print('给%s发送短信成功,验证码为:%s'%(phone,code))
    time.sleep(3)
    return True

celery_task/order_task.py:

from .celery import app
import time

@app.task
def add(price, count):
    print('总价格为%s' % price * count)
    time.sleep(2)
    return price * count

3.在celery_task同级别新建一个py文件:add_task.py:
add.task.py:

from celery_task.user_task import send_msg

res = send_msg.delay('18888',5678)
print(res)  # 735ba869-41fa-4af6-aa6e-d3f8c1d34787

这时任务已经提交上去:
image
4.启动worker:

celery  -A celery_task  worker -l info -P eventlet
'''执行该命令需要到celery_task这一级目录'''

image
image

5.在scripts中新建一个文件:get_result.py
get_result.py(和之前s2代码一致)

from main import app
from celery.result import AsyncResult
id = 'aa6dc05c-1108-48de-8a8d-0870eff9c25b'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():  # 执行完了
        result = a.get()  #
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

image

4.延迟任务

关于时间的两个重要补充:
timedelta(days=10)是一个可以加减的时间类型:

print(type(timedelta(days=10)))  # <class 'datetime.timedelta'>
print(datetime.now()+timedelta(days=10))  # 2023-03-19 15:23:16.295510

立即异步执行:方法名.delay(方法参数)

res = send_msg.delay('18888',5678)

延迟任务:方法名.apply_async(args=[方法参数],eta=时间)
延迟任务执行步骤:
1.还是用user_task中的方法send_msg,首先在add_task中写延迟任务:

eta = datetime.utcnow() + timedelta(seconds=10)
res = send_msg.apply_async(args=['188888', '4567'], eta=eta)
print(res)  # 42e6d19a-1760-41fb-92b3-dc37f42fd078

2.开启worker(也可以在执行add_task之后执行):

celery -A celery_task worker -l info -P eventlet

3.执行add_task方法:
image
image

5.定时任务

在指定的时间执行指定的方法
1.写延迟任务我们是在add_task中写,但是写定时任务我们需要在celery.py中写:
celery_task/celery.py:

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])

# 这两步如果没有配置需要使用utc时间,配置才能使用配置时区的时间
app.conf.timezone = 'Asia/Shanghai'
# 是否使用utc时间
app.conf.enable_utc = False
# 任务的定时配置
app.conf.beat_schedule = {  # 可以配多个任务
    'send_sms': {  # 名字可以随便起
        'task': 'celery_task.user_task.send_msg',  # 将需要定时执行的方法写在这里
        'schedule': timedelta(seconds=3),  # 每个3秒执行一次
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        # 'schedule': crontab(hour=16, minute=16),  # 每天9点43
        'args': ('13260676190', '6666'),  # 如果方法没有参数这一项就不写
    },
}

celery_task/user_task.py:

from .celery import app
import time

@app.task
def send_msg(phone, code):
    '''模拟发送短信'''
    print('给%s发送短信成功,验证码为:%s' % (phone, code))
    time.sleep(3)
    return True

2.定时任务我们需要每个多少秒或者是未来某个时间将任务提交到broker,这个需要启动beat来帮我们完成。
beat:定时提交任务进程(配置在app.conf.beat_schedule中的任务)
worker:执行任务
启动beat和worker

celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet
'''导入需要从包(celery_task)这一层开始导(scripts)'''

image
image
补充:如果我们在公司中只做定时任务,那么我们可以采用APSchedule框架,参考博客:https://blog.csdn.net/qq_41341757/article/details/118759836

6.django中使用celery

6.1 使用步骤

注意:celery中使用djagno,有时候任务中会使用django的orm,在celery中一定要加以下代码:

import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')

1.将我们写的包复制到项目目录下:
image
2.在使用异步提交任务的位置,导入使用即可
在视图中导入任务
from celery_task.celery import app
3.启动worker,如果有定时任务就启动beat
4.等待人物被worker执行
5.在视图函数中查询任务执行结果

6.2 模拟秒杀任务

秒杀逻辑分析:
1 前端秒杀按钮,用户点击---》发送ajax请求到后端
2 视图函数---》提交秒杀任务---》借助于celery,提交到中间件中了
3 当次秒杀的请求,就回去了,携带者任务id号在前端
4 前端开启定时任务,每隔3s钟,带着任务,向后端发送请求,查看是否秒杀成功
5 后端的情况
1 任务还在等待被执行----》返回给前端,前端继续每隔3s发送一次请求
2 任务执行完了,秒杀成功了---》返回给前端,恭喜您秒杀成功--》关闭前端定时器
3 任务执行完了,秒杀失败了---》返回给前端,秒杀失败--》关闭前端定时器
user/views.py

from rest_framework.viewsets import ViewSet

from celery_task.order_task import sckill_task
from celery_task.celery import app
from celery.result import AsyncResult


class SckillView(ViewSet):
    @action(methods=['GET'], detail=False)
    def sckill(self, request):
        a = request.query_params.get('id')
        # 使用异步,提交一个秒杀任务
        res = sckill_task.delay(a)
        return APIResponse(task_id=res.id)

    @action(methods=['GET'], detail=False)
    def get_result(self, request):
        task_id = request.query_params.get('task_id')
        a = AsyncResult(id=task_id, app=app)
        if a.successful():
            result = a.get()
            if result:
                return APIResponse(msg='秒杀成功')
            else:
                return APIResponse(code=101, msg='秒杀失败')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
            return APIResponse(code=666, msg='还在秒杀中')

order_task.py:

# 秒杀任务
import random
import time


@app.task
def sckill_task(good_id):
    # 生成订单,减库存,都要在一个事务中
    print("商品%s:秒杀开始" % good_id)
    # 这个过程,可能是1,2,3s中的任意一个
    time.sleep(random.choice([6, 7, 9]))
    print('商品%s秒杀结束' % good_id)

    return random.choice([True, False])

前端:

<template>
  <div>

    <button @click="handleSckill">秒杀</button>
  </div>
</template>

<script>
import Header from '@/components/Header';
import Banner from '@/components/Banner';
import Footer from '@/components/Footer';

export default {
  name: 'Sckill',
  data() {
    return {
      task_id: '',
      t: null
    }
  },
  methods: {
    handleSckill() {
      this.$axios.get(this.$settings.BASE_URL + '/user/sckill/sckill/?id=999').then(res => {
        this.task_id = res.data.task_id
        this.t = setInterval(() => {
          this.$axios.get(this.$settings.BASE_URL + '/user/sckill/get_result/?task_id=' + this.task_id).then(res => {
            if (res.data.code == 666) {
              //如果秒杀任务还没执行,定时任务继续执行
              console.log(res.data.msg)
            } else {
              // 秒杀结束,无论成功失败,这个定时任务都结束
              clearInterval(this.t)
              this.t = null
              this.$message(res.data.msg)
            }

          })
        }, 2000)
      }).catch(res => {

      })
    }
  }

}
</script>

7.轮播图接口加入缓存

网站首页被访问的概率很高,瞬间的并发量会导致mysql不堪重负。并且首页的轮播图接口基本都是固定的几张图,修改的频率较小。
而redis正好弥补了这个缺点,可以承受较大的并发量。
现在的轮播图接口变成了:
1.轮播图请求来的时候先去缓存中查看,如果有,直接返回
2.如果没有,查数据库,然后将轮播图数据放到redis中,缓存起来
home/views.py:

from rest_framework.mixins import ListModelMixin
from django.core.cache import cache
from .serializer import BannerSerializer

class BannerView(GenericViewSet, ListModelMixin):
    queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
    serializer_class = BannerSerializer
    def list(self, request, *args, **kwargs):
        banner_list = cache.get('banner_list')
        if banner_list:
            print('走了缓存')
            return APIResponse(data=banner_list)
        else:
            print('走了数据库')
            res = super().list(request, *args, **kwargs)
            cache.set('banner_list',res.data)
            return APIResponse(data=res.data)

settings/dev.py

# 首先需要安装django_redis模块
CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "CONNECTION_POOL_KWARGS": {"max_connections": 100}
            # "PASSWORD": "123",
        }
    }
}

可以看到第一次访问的是数据库,之后访问的都是缓存:
image
image

8.双写一致性

按照程序执行得流程我们第一次取值都回去数据库取,从第二次开始我们就需要在缓存中取。当我们新增几张轮播图(或者其他数据)时,我们并不一定会刷新页面,这样就造成了缓存没有办法及时将数据库中的内容同步到缓存。也就是mysql和redis(缓存数据库)的内容不一致,存在问题,这种现象叫双写不一致。
解决双写不一致的方法:
1.修改数据,删除缓存(rom操作之后用代码删除和更新)
2.修改数据库,更新缓存
3.定时更新缓存(实时性差,仅适用于数据重要性低的数据)。可以采用celery制作定时任务。
我们是用定时任务来将数据库同步到缓存中:
celery_task/user_task.py:

from home.models import Banner
from home.serializer import BannerSerializer
from django.core.cache import cache

@app.task
def update_banner():
    banner_queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
    ser = BannerSerializer(instance=banner_queryset, many=True)
    cache.set('banner_list', ser.data)
    print(ser.data)
    return True

celery_task/celery.py:

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task',])

# 修改时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用utc时间
app.conf.enable_utc = False
# 任务的定时配置
app.conf.beat_schedule = {
    'update_banner':{
        'task': 'celery_task.user_task.update_banner',
        'schedule': timedelta(seconds=3),
    }
}

但是我们执行以上代码发现前端轮播图并不会显示,这是因为image字段中存储的路径在celery中不能补全:
image
image
修改后:
celery_task/celery.py:

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task',])

# 修改时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用utc时间
app.conf.enable_utc = False
# 任务的定时配置
app.conf.beat_schedule = {
    'update_banner':{
        'task': 'celery_task.user_task.update_banner',
        'schedule': timedelta(seconds=3),
    }
}

celery_task/user_task.py:

from home.models import Banner
from home.serializer import BannerSerializer
from django.core.cache import cache
from django.conf import settings

@app.task
def update_banner():
    banner_queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
    ser = BannerSerializer(instance=banner_queryset, many=True)
    for item in ser.data:
        item['image'] = settings.BACKEND_URL+item['image']
        print(item.get('image'))
    cache.set('banner_list', ser.data)
    return True