Python FastAPI 异步获取 Neo4j 数据

发布时间 2023-09-08 11:04:34作者: VipSoft

前提条件

改造

utils/neo4j_provider.py
增加了暴露给外面调用的属性,同时提供了同步和异步执行的驱动

#!/usr/bin/python3
import os

from neo4j import GraphDatabase, AsyncGraphDatabase, basic_auth, Driver, AsyncDriver
from settings import settings


# Neo4j 数据库操作类
class Neo4jProvider:
    """创建 Neo4j 数据库连接"""

    def __init__(self) -> None:
        # 获取环境变量值,如果没有就返回默认值
        self.url = settings.NEO4J_URI
        self.username = settings.NEO4J_USER
        self.password = settings.NEO4J_PASSWORD
        self.neo4j_version = settings.NEO4J_VERSION
        self.database = settings.NEO4J_DATABASE
        self.port = int(settings.NEO4J_PORT)

    # 同步驱动
    def driver(self) -> Driver:
        return GraphDatabase.driver(self.url, auth=basic_auth(self.username, self.password))

    # 异步驱动
    def async_driver(self) -> AsyncDriver:
        return AsyncGraphDatabase.driver(self.url, auth=basic_auth(self.username, self.password))


# 同步驱动。暴露给外面调用
driver = Neo4jProvider().driver()

# 异步驱动。暴露给外面调用
asyncDriver = Neo4jProvider().async_driver()

routers/node_router.py
添加一个查询数据的接口方法

#!/usr/bin/python3

import logging
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from utils.neo4j_provider import asyncDriver
from settings import settings

router = APIRouter()


# 定义一个根路由
@router.get("/add")
def add_node():
    # TODO 往 neo4j 里创建新的节点
    data = {
        'code': 0,
        'message': '',
        'data': 'add success'
    }
    return JSONResponse(content=data, status_code=status.HTTP_200_OK)


@router.route("/search")
async def get_search(q: str = None):
    if q is None:
        return []
    cql = ("""
                 MATCH (p:Person) WHERE p.name CONTAINS $name RETURN p
             """)
    records, _, _ = await asyncDriver.execute_query(
        cql,
        name="陈长兴",  # 将参数 q 传给cql 里的 $name 变量
        database_=settings.NEO4J_DATABASE,
        routing_="r",
    )
    for record in records:
        # 打印出 record 属性
        logging.info("%s, %s", record["p"]["name"], record["p"]["generation"])
    # 转成 json
    data = [serialize_person(record["p"]) for record in records]
    return JSONResponse(content=data, status_code=status.HTTP_200_OK)


def serialize_person(person):
    return {
        "id": person["id"],
        "name": person["name"],
        "generation": person["generation"],
        "votes": person.get("votes", 0)
    }

运行效果

http://127.0.0.1:8000/api/node/search?name=%E9%99%88%E9%95%BF%E5%85%B4
image