Celery with FastAPI and TortoiseORM

发布时间 2023-06-08 15:21:07作者: waketzheng

API server using: fastapi+tortoise-orm+postgresql+redis+supervisor+nginx
There are some period tasks and async tasks that will run in celery+rabbitmq

Demo

  1. init.py
from .celery import app as celery_app  # NOQA

__all_ = ("celery_app",)
  1. celery.py
import asyncio
import os
from pathlib import Path
from typing import Any, Coroutine

from celery import Celery
from celery.schedules import crontab
from celery.signals import worker_ready
from dotenv import load_dotenv

from .settings import CELERY_ROOT, broker_url

load_dotenv()

try:
    FTP_PASSWD = os.environ["FTP_PASSWD"]
except KeyError as e:
    print(e, "请先配置环境变量或.env文件")
    raise

CALLBACK_KEY_PREFIX = "celery__callback__"
ORDER_NUM_NOTIFIED_KEY = "__notify__order_num="
IS_TEST_SERVER = str(CELERY_ROOT).startswith("/home/ubuntu")
host_name = CELERY_ROOT.name
app = Celery(host_name, broker=broker_url)
app.config_from_object(f"{host_name}.settings")



def get_result(task_id: str) -> Any:
    """获取celery任务ID对应的执行结果"""
    return app.AsyncResult(task_id).result


def run_async(coro: Coroutine, initdb: bool = True) -> Any:
    """同步的方式运行异步函数"""
    if initdb:
        # 当时celery还是4.x版本,不太支持异步
        # 所以db操作是通过发HTTP请求给fastapi来实现的
        raise Exception("Not support db operation yet!")
    return asyncio.run(coro)


def run_raw_async(coro: Coroutine) -> Any:
    """同步的方式运行异步函数, 但不初始数据库, 适用于无SQL操作的情况"""
    return run_async(coro, False)


@app.task
def do_query(mchid: str, order_num: str, cached_key: str) -> None:
    """循环询问结果"""
    run_raw_async(query_pay_result(mchid, order_num, cached_key))


@app.task
def do_transfer_notify(r: dict, cached_key: str) -> None:
    """解析回调通知数据并做转发"""
    run_raw_async(transfer_notify(r, cached_key, is_notified=True))


def run_shell(cmd, echo: bool = True):
    """打印要执行的命令, 并返回命令执行的结果"""
    if echo:
        print("--> " + cmd)
    with os.popen(cmd) as p:
        return p.read()


@worker_ready.connect
def print_git_info(**kwargs):
    """确认celery执行的是哪一个commit的代码"""
    cmd = "git log -1 --stat"
    print(run_shell(cmd))  # 因为配置了supervisor,所以直接用print即可(当然也可以用logger.info)
    if "nothing to commit, working tree clean" not in run_shell("git status", False):
        print("Code already changed after last commit...\n")
        print(run_shell("git diff"))


app.conf.update(
    beat_schedule={
        "run-everyday": {
            "task": f"{Path(__file__).parent.name}.celery.update_check_data",
            # 每天09:20(生产服是9:25)启动定时任务
            "schedule": crontab(minute=20 if IS_TEST_SERVER else 25, hour=9),
        }
    }
)


@app.task
def update_check_data() -> None:
    """更新对账单"""
    # This is a period task, and also can be run by xxx.delay()
    pass
  1. settings.py
from pathlib import Path

CELERY_ROOT = Path(__file__).parent.resolve()
# celery
broker_url = f"amqp://waket:123456@localhost:5672/{CELERY_ROOT.name}"
result_backend = "redis://localhost:6379/1"
task_serializer = "msgpack"
result_serializer = "json"
task_result_expires = 60 * 60 * 24
accept_content = ["json", "msgpack"]

timezone = "Asia/Shanghai"
enable_utc = False

REDIS_HOST = "localhost"
REDIS_DB = 2
REDIS_URL = f"redis://{REDIS_HOST}/{REDIS_DB}"
  1. routers/foo.py
from <project_name>.celery import do_transfer_notify

@router.post("", include_in_schema=False)
async def fine_thank_you(request: Request):
    """接收通知,然后启动celery任务转发给对应的项目"""
    data = await request.json()
    rv = do_transfer_notify.delay(data)
    return dict(data=data, rv=rv, ip=get_client_ip(request))