Celery将任务分发到不同的队列,交给不同的Worker处理

发布时间 2023-10-04 12:16:46作者: 蕝戀

https://docs.celeryq.dev/en/stable/userguide/routing.html#routing-tasks

https://blog.csdn.net/wanglei_storage/article/details/130029916

https://www.cnblogs.com/yangjian319/p/9097171.html

celery配置:

task_routes = {
    # 将add任务放到add_queue队列中执行
    # key是任务函数的完整路径, 然后指定queue
    # key也可以是通配形式,比如app.users.*
    "apps.users.celery_test.tasks.add": {"queue": "add_queue"}
    
    # 其他的任务默认会丢给名为celery队列的worker进程处理。并不需要定义。
}

先启动flower监测工具,flower最先启动,不然已经分发的任务flower看不到的

celery.exe -A celery_tasks.main flower

启动worker进程

# -Q 是用来表示当前的Worker只处理名为add_queue的队列任务。
# -n 是给当前的worker进行命名,不然多个worker会出现名字冲突提示。
celery.exe -A celery_tasks.main worker -l INFO -P eventlet -Q add_queue -n celery@add_queue


# 再启动一个专门处理默认的celery队列的worker(也就是之前一直使用的方法)
celery.exe -A celery_tasks.main worker -l INFO -P eventlet -n celery@celery_queue

启动beat:

celery.exe -A celery_tasks.main beat -l INFO

进入flower的web界面查看:

调用异步任务时手动指定队列

class CeleryView(APIView):
    # authentication_classes = [AllowAny, ]
    permission_classes = [AllowAny, ]

    def get(self, request: Request):

        from .tasks import add
        # add方法默认是会被调度到add_queue队列中的
        # 现在手动指定调度到默认的celery队列中。
        add.apply_async(args=(1, 8), queue="celery")

        return Response("celery task")