6.29 celery分布式异步任务框架

发布时间 2023-07-05 09:26:23作者: ranbo145

1.celery:分步式异步任务框架 

/1  异步任务

/2  延迟任务

/3  定时任务

/4  celery架构

  消息中间件(broker):消息队列:可以使用redis,rabbitmq
  任务执行单元(worker):执行单元 执行提交的任务
  任务执行结果存储(banckend):可以使用mysql,redis

/5 安装celery模块:

  cmd 中输入 -pip install Celery

  celery不支持win,所以想再win上运行,需要额外安装eventlet

/6  安装eventlet : pip3 install eventlet

   补充操作:退出虚拟环境  deactivate

          卸载安装的模块:pip3 uninstall celery

2.celery执行异步任务 

/1  新建py文件,注册celery任务

  # 导入Celery类
  from celery import Celery
  broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
  backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis
  # 实例化得出对象
  app = Celery(__name__,broker=broker, backend=backend,)

  @app.task # 加上这个装饰器,让add函数变成了celery的任务
  def add(a, b):
    c = a + b
    return c

/2  新建文件,在文件中写入异步调用操作 

  # 1 异步调用 : 执行下面的函数,把任务提交消息队列中,并没有执行,等待work的执行
  # 返回给我们一个id号,我们之后通过id号来查结果
  res = add.delay(2,3)
  print(res) # 得到了一个ID号49e33190-3499-48a3-968b-5f1f59888011

/3  在文科路径下启动worker 

  cmd中进入到提交消息队列代码所在的路径中

  celery -A demo worker -l info -P eventlet

  启动worker和提交消息并没有先后顺序

/4  新建一个 文件来执行结果储存

  # 执行结果存储
  from demo import app
  # 从celery的包下
  from celery.result import AsyncResult

  id = '49e33190-3499-48a3-968b-5f1f59888011'
  if __name__ == '__main__':
  a = AsyncResult(id=id, app=app)
  if a.successful(): # 正常执行完成
  result = a.get() # 任务返回的结果
  print(result)
  elif a.failed():
  print('任务失败')
  elif a.status == 'PENDING':
  print('任务等待中被执行')
  elif a.status == 'RETRY':
  print('任务异常后正在重试')
  elif a.status == 'STARTED':
  print('任务已经开始被执行')

3.celery的包结构使用 

/1  创建包,在里面创建celery.py(名字固定)

  from celery import Celery
  broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
  backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis

  # 在括号中添加include参数,include的中括号中写入需要被app管理的文件,

  #  这样写可以免去在上面加装饰器进行管理
  app = Celery(__name__,broker=broker, backend=backend,
  include=['celery_task.user_task','celery_task.home_task','celery_task.course_task'])

/2 在包中新建任务文件

  例如user_task 

  # 新建任务文件
  # 需要在celery前面加点,不加点就表示从第三方包导入app
  # celery前面加点表示从当前路径下导入
  from .celery import app
  import time

  @app.task
  def send_sms(mobile,code):
  time.sleep(3)
  print('成功发送短信给%s,验证码为%s' % (mobile,code))
  return True

/3  在包的外边创建提交任务的代码 

  # 提交任务
  from celery_task.user_task import send_sms
  res = send_sms.delay(88888888,77777)
  print(res)

/4  启动worker

  启动worker
  要在包所在的路径下启动
  celery -A 包名 worker -l info -P eventlet    

/5  查看结果  

  # 查询结果
  from celery_task.celery import app
  # 从celery的包下
  from celery.result import AsyncResult

  id = '190ffcc5-9cb5-4781-978f-9d1949c68d86'
  if __name__ == '__main__':
  a = AsyncResult(id=id, app=app)
  if a.successful(): # 正常执行完成
  result = a.get() # 任务返回的结果
  print(result)
  elif a.failed():
  print('任务失败')
  elif a.status == 'PENDING':
  print('任务等待中被执行')
  elif a.status == 'RETRY':
  print('任务异常后正在重试')
  elif a.status == 'STARTED':
  print('任务已经开始被执行')

4.celery执行延时任务和定时任务

/1  定义:

  延时任务:等待一定的时间再执行任务

  定时任务:规定例如每天几点执行一次

/2  延时任务

  先导入异步任务的函数

  创建时间对象

  例:eta=datetime.utcnow() + timedelta(seconds=5)   #  datetime.utcnow() 拿取的是UTC时间(是个对象),timedelta(seconds=5)(变为对象)可以和上面拿到的UTC时间相加

  # 使用apply_async传参数加上时间对象
  res = send_sms.apply_async(kwargs={'mobile':88888888,'code':77777},eta=eta)

/3 定时任务 (写在celery文件中)

  # 配置时区
  app.conf.timezone = 'Asia/Shanghai'
  # 是否使用UTC
  app.conf.enable_utc = False  

  # 定时任务的配置
  from datetime import timedelta
  from celery.schedules import crontab
  app.conf.beat_schedule = {
    'send_sms': {
    'task': 'celery_task.user_task.send_sms',
    'schedule': timedelta(seconds=5),
    'args': ('99999999', 8888),
    },
    'obtain_app': {
    'task': 'celery_task.home_task.obtain_app',
    # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
    'schedule': crontab(hour=11, minute=38), # 每天11点35,执行
    'args': (),
    }
  }

5.django中使用celery  

/1  首先把建立的celery模块的包文件夹移动到根路径下

/2  增加新的任务文件banner_task,并添加新任务

  from .celery import app

  @app.task
  def add_banner():
    from home.models import Banner
    Banner.objects.create(title='测试', image='/1.png', link='/banner', info='xxx',orders=99)
    return 'banner增加成功'

/3  在celery.py 中加载django配置

  # 注:只有在需要用到django中的

  import os
  os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_01.settings.dev")

/4  视图函数中添加接口,导入任务,提交即可 

  class CeleryView(APIView):
    def get(self, request):
    res = add_banner.delay()
    return APIResponse(msg='新增banner的任务已经提交了')

/5 启动worker

  在根路径下启动就行

6.接口的缓存 

/1  双写一致性:

  缓存数据和数据库的数据不一致了

  写入数据库,删除缓存

  写入数据库,更新缓存

/2  在轮播图接口下添加方法 

  def list(self, request, *args, **kwargs):
  #  接口中接入缓存的逻辑
  1 先去缓存中查一下有没有数据
  2 如果有,直接返回,不走父类的list了(list在走数据库)
  3 如果没有,走父类list,查询数据库
  4 把返回的数据,放到缓存中

  data = cache.get('home_banner_list')
  if not data: # 缓存中没有
  print('走了数据库')
  res = super().list(request, *args, **kwargs) # 查询数据库
  # 返回的数据,放到缓存中
  data = res.data.get('data') # {code:100,msg:成功,data:[{},{}]}
  cache.set('home_banner_list', data)
  return APIResponse(data=data)