celery 的高级使用

发布时间 2023-11-21 09:42:08作者: Way*yy

celery包结构

celery_task # celery包
    celery.py # celery连接和配置相关文件,且名字必须叫celery.py
    home_tasks.py # home app的异步任务
    user_task.py # user app 的异步任务
    get_result.py # 查看异步任务
    add_task.py #启动异步任务
    __init__.py # 包文件
    
### 1 创建 celery_task  包,包内部有celery.py和一堆task-->['celery_task.home_task','celery_task.user_task']

### 2 celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存,用redis
app = Celery('app', broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])

###3 每个task,写自己相关的任务

### 4 启动worker
 celery -A celery_task worker -l info -P eventlet
    
### 5 提交任务
from celery_task.home_task import add
res=add.delay(3,4)
print(res)

### 6 查看结果
from celery_task.celery import app
from celery.result import AsyncResult

id = 'e31441d9-e9a6-4d70-9a66-a9227a6bc273'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

使用celery异步发送框架

user_task.py

from .celery import app
from libs.tx_sms.sms import send_sms


# 使用celery异步框架提交验证码
@app.task
def sms_send(mobile, code):
    print('sms_send:>>>',mobile,code)
    response = send_sms(mobile, code)
    print(response)
    if response:
        return f"手机号{mobile}发送成功,验证码为{code}"
    else:
        return f"手机号{mobile}验证码发送失败,请重新尝试"

get_test.py

from celery_task.celery import app
from celery.result import AsyncResult

id = ''
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

views.py

    @action(methods=["POST"], detail=False)
    def send_sms(self, request, *args, **kwargs):
        """
            短信验证码接口
            路由:# http://127.0.0.1:8000/api/v1/user/mobile/send_sms/
        """
        mobile = request.data.get("mobile")
        code = sms.get_code(6)
        cache.set('cache_mobile_%s' % mobile, code, timeout=120)
        # 异步发送短信
        response = send_sms_celery.delay(mobile,code)
        print(response)
        return APIResponse(msg="短信已发送")

celery延迟任务和定时任务

异步任务

# 使用celery异步框架提交验证码
@app.task
def sms_send(self, mobile, code, ):
    response = sms.send_sms(mobile, code)
    if response:
        return f"手机号{mobile}发送成功,验证码为{code}"
    else:
        return f"手机号{mobile}验证码发送失败,请重新尝试"

延迟任务

@app.task
def send_sms(phone):
    time.sleep(2)
    return f'{phone}发送短信成功'

## 提交延迟任务   apply_async
# 添加延迟任务
from datetime import datetime, timedelta

print(datetime.utcnow())  # utc 时间,跟咱们差8个小时
# eta 就是 10s 后的实际
eta = datetime.utcnow() + timedelta(seconds=30)
res = send_sms.apply_async(args=(18953675221,), eta=eta)
print(res)

定时任务

#### 1 在celery.py中
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 'redis://127.0.0.1:6379/1'  # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2'  # 结果存储 redis

app = Celery(__name__, broker=broker, backend=backend, include=["celery_task.home_tasks", "celery_task.user_task"])
# 设置时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

app.conf.beat_schedule = {
    'add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (5, 6),
    },

}


###2 启动worker
celery -A celery_task worker -l info -P eventlet
###3 启动beat(它来定时提交任务)
celery -A celery_task beat -l info

在Django中使用celery

# 通用方案

#1 把咱们之前写的包,放到项目路径下
#2 提交异步或延迟任务,导入直接提交即可
#3 只要启动worker,这些任务就会被执行
#4 如果要使用django中的东西(配置文件,缓存,orm。。。),都需要在celery.py中写
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
如果不写就会报错
#5 使用django内置东西的任务
@app.task
def cache_demo(key, value):
    cache.set(key, value)
    return '缓存成功'

接口缓存

# 1 首页获取轮播图接口--->去数据库查的--->每次来到首页,都要查一次---->轮播图变得很慢
# 2 咱们把轮播图数据,放到redis,做缓存
	-以后:只要缓存中有,就从缓存中拿(redis)
    -如果缓存中没有,从数据库查,查完再放到缓存中

# 3 查询所有的接口,都可以加缓存

轮播图接口加入缓存

from rest_framework.mixins import ListModelMixin
from django.core.cache import cache
from utils.common_response import APIResponse


class ListMixin(ListModelMixin):
    def list(self, request, *args, **kwargs):
        # 先从缓存中查看有没有banner
        banner_list = cache.get("banner")
        # 如果没有调用父类从数据库中查,并保存到缓存
        if not banner_list:
            banner = super().list(request, *args, **kwargs)
            cache.set("banner", banner.data)
            return APIResponse(result=banner.data)
        # 如果有直接返回出去
        return APIResponse(result=banner_list)

使用装饰器完成


双写一致性

# 1 接口加缓存---->mysql数据改了--->缓存数据没动--->数据不一致了
	- 有的数据,必须一致(缓存删除和修改,要在修改数据后)
        -修改,插入数据(mysql),删除缓存
        -修改,插入数据(mysql),修改缓存
        ----不合理的方案----
        删除缓存,再改数据
        
    - 有的数据,可以容忍--->实时性要求没有那么高
    	-定时更新  --->每隔5分钟更新一次
        
# 2 高级名:  双写一致性
当修改了数据库中的数据同时也需要修改缓存中的数据,如何保证数据库中和缓存中的数据一致,这就是双写一致性
解决方案:
    -改数据,删除缓存,该缓存
    -定时更新
        
# celery:发送短信,改成异步
# celery:定时更新轮播图缓存

django缓存过期时间

cache.set(self.cache_key, res.data, None)   # 永不过期

cache.set(self.cache_key, res.data)   #  不写5分钟过期

通过定时更新缓存,实现双写一致性

#####1 任务
from django.core.cache import cache
from home.models import Banner
from home.serializer import BannerSerializer
from settings.user_settings import BASE_URL, BANNER_COUNT
from celery_task.celery import app


@app.task
def update_banner():
    # 查询banner所有的数据
    banner = Banner.objects.filter(is_delete=False, is_show=True).order_by("orders")[:BANNER_COUNT]
    # 因为拿到的是Query对象,所以需要序列化
    serializer = BannerSerializer(banner, many=True)
    # 通过序列化循环出来序列化的数据
    for ser in serializer.data:
        ser["img"] = BASE_URL + "/" + ser["img"]
    cache.set('banner_list', ser.data)
    return '更新成功'

### 2 celery.py 
app.conf.beat_schedule = {
    'add': {
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': (),
    },

}

### 3 启动worker
celery -A celery_task worker -l info -P eventlet

##  4 启动beat
celery -A celery_task beat -l info

异步秒杀逻辑

# 1 分析流程--->提高了 并发量
	同步流程:用户在前端,点击秒杀按钮---->提交请求到后端---->[扣减库存,生成订单]假设耗时--->同步操作-->10s钟处理完成,秒杀成功--->返回给前端--->如果秒杀人数过多,同步操作,不能承载更多人同时秒杀
    异步流程:用户在前端,点击秒杀按钮---->提交请求到后端---->提交一个任务[扣减库存,生成订单]假设耗时--->异步操作-->10s钟处理完成,秒杀成功--->前端再发请求查询--->如果秒杀人数过多,异步操作,10s内能承载非常多用户操作

前端页面

<template>
  <div class="home">
    <Header></Header>
    <div style="padding: 50px;margin-left: 100px">

      <h1>Go语言课程</h1>
      <img src="http://photo.liuqingzheng.top/2023%2002%2022%2021%2057%2011%20/image-20230222215707795.png"
           height="300px"
           width="300px">
      <br>
      <el-button type="danger" @click="handleSeckill">秒杀课程</el-button>
    </div>

    <br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>
    <Footer></Footer>
  </div>
</template>
<script>
import Headers from "@/components/Headers.vue";
import Footer from "@/components/Footer.vue";

export default {
  name: "SeckillView",
  data() {
    return {
      seckill_id: '',
      t: ''
    }
  },
  methods: {
    handleSeckill() {
      this.$axios.post(this.$settings.BASE_URL + 'api/v1/home/seckill/seckill/', {
        course_id: 'go语言'
      }).then(res => {
        console.log(res.data)
        if (res.data.code === 100) {
          this.$message({
            message: res.data.msg,
            type: 'warning',
            duration: 1500
          });

          // 起一个定时任务,每隔2s向后端查询一次,看是否秒杀成功
          this.seckill_id = res.data.seckill_id
          this.t = setInterval(() => {
            this.$axios.get(this.$settings.BASE_URL + 'api/v1/home/seckill/get_seckill_result/?seckill_id=' + this.seckill_id).then(res => {
              if (res.data.code === 100 || res.data.code === 101) {
                alert(res.data.msg)
                clearInterval(this.t)
                this.t = null

              } else if (res.data.code === 102) { //秒杀逻辑还没开始执行
                this.$message('等待开始秒杀');
              } else if (res.data.code === 103) {
                this.$message('正在秒杀途中');
              }
            })

          }, 2000)

        } else {
          this.$message({
            message: '服务端异常,请联系系统管理员',
            type: 'warning',
            duration: 1500
          });
        }
      })
    }
  },
  components: {
    Headers, Footer
  }
}
</script>
<style scoped>

</style>

<style scoped>

</style>

router

import Seckills from "@/views/seckills.vue";

Vue.use(VueRouter)

const routes = [
  {
    path: '/seckill',
    name: 'seckills',
    component: Seckills
  }
]

views

class SeckillView(GenericViewSet):

    @action(methods=["POST"], detail=False)
    def seckill(self, request, *args, **kwargs):
        seckill_id = request.data.get("course_id")
        res = seckill_course.delay(seckill_id)
        return APIResponse(seckill_id=str(res), msg='秒杀任务已经提交')

    @action(methods=['GET'], detail=False)
    def get_seckill_result(self, request, *args, **kwargs):
        seckill_id = request.query_params.get('seckill_id')
        a = AsyncResult(id=seckill_id, app=app)
        if a.successful():
            result = a.get()
            if result:
                return APIResponse(msg='恭喜您,秒杀成功')
            else:
                return APIResponse(code=101, msg='很遗憾,您没有秒到')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
            return APIResponse(code=102, msg='暂未轮到您')
        elif a.status == 'STARTED':
            print('任务已经开始被执行')
            return APIResponse(code=103, msg='正在秒杀,请稍后')
        else:
            return APIResponse(code=104, msg='服务端错误,秒杀失败')

异步任务

@app.task
def seckill_course(seckill_id):
    time.sleep(6)
    res = random.choices([106, 109])
    if res == "106":
        return True
    elif res == "109":
        return False
    else:
        return "系统错误,请联系管理员"