fastapi_oracle_example

发布时间 2023-04-22 20:33:28作者: 虎虎生威啊

/Users/song/codelearn/FastAPI-Oracle-main/main.py

import os, secrets, requests, json
from typing import List, Optional

from fastapi import Depends, FastAPI, HTTPException, Response, status, Path
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.status import HTTP_404_NOT_FOUND, HTTP_401_UNAUTHORIZED, HTTP_503_SERVICE_UNAVAILABLE

from sqlalchemy.orm import Session 
from sqlalchemy import MetaData, inspect
from sqlalchemy.sql import func

from app import model, schema, crud
from app.database import SessionLocal, engine

from dotenv import load_dotenv, find_dotenv

app = FastAPI()


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.on_event("startup")
async def start(
        db:   Session = Depends(get_db)
    ):
    print("Starting up...")
    inspector = inspect(engine)
    # check if we created the tables in the database already. If not, create and populate them
    if not inspector.has_table('customer'):
        print("Creating the tables in the database")
        model.LoyaltyLevel.metadata.create_all(engine)
        model.Customer.metadata.create_all(engine)
        model.Purchase.metadata.create_all(engine)

        #populate the tables with test data
        print("Populating the tables in the database")
        session = Session(engine)
        loyalty_level_instance_1 = model.LoyaltyLevel(level_id="pl", description='Platinum', discount=25)
        loyalty_level_instance_2 = model.LoyaltyLevel(level_id="gl", description='Gold', discount=15)
        session.add_all([loyalty_level_instance_1, loyalty_level_instance_2])
        
        customer_instance = model.Customer(firstname='John', lastname='Doe', date_of_birth=func.now(), level_id=loyalty_level_instance_1.level_id,  signup_date=func.now())
        purchase_instance = model.Purchase(customer = customer_instance, purchase_name="something")
        session.add_all([purchase_instance])
        session.commit()
    else:   
        print("Found the database tables")

@app.on_event("shutdown")
async def shutdown(db:   Session = Depends(get_db)):
        print("Shutting down...")
        print("Dropping tables")
        model.LoyaltyLevel.metadata.drop_all(engine)
        model.Customer.metadata.drop_all(engine)
        model.Purchase.metadata.drop_all(engine)
        print("Tables dropped")
       
    



@app.get("/")
def read_root():
    return {"Hello": "World"}

# -- Customer --#

@app.get(
    "/customers",
    tags=["Customers"],
    response_model=List[schema.Customer],
    response_model_exclude={"date_of_birth",'signup_date'}, # 集合形式, in case we need to exclude a field from response
    # response_model_exclude=["date_of_birth",'signup_date'],# 数组形式 
    # response_model_exclude_none=True , # usefull if response json is too big and we want to hide nulls to make it smaller
    summary="Gets all customers",
    response_description="A list containing all the customers"
)
def get_customers(
        db:   Session = Depends(get_db)
    ):
    return crud.get_customers(db)



@app.get(
    "/customer/{customer_id}",
    tags=["Customers"], # a way to group api calls in the docs page
    response_model=List[schema.Customer],
    summary="Gets a single customer based on customer_id",
    response_description="A single customer based on the provided ID",
    responses={404: {"model": None, "description": "Customer ID not found"}}
)
def get_customer(customer_id: int = Path(
                                        ...,
                                        title="Customer ID",
                                        description="Customer unique indetifier",
                                        gt=0
                                        ),
                db:   Session = Depends(get_db)
                #,auth: bool    = Depends(is_authenticated)
                ):
    """
    Multiline comment
    """
    result = crud.get_customer(db, customer_id)

    if not result:
        return Response(
            'Customer not found',
            media_type="text/plain",
            status_code=HTTP_404_NOT_FOUND
        )

    return result

@app.post("/customer/", 
        tags=["Customers"], # a way to group api calls in the docs page
        response_model=schema.CustomerInput, 
        summary="Create a customer",
        response_description="Newly created customer",
        status_code = status.HTTP_201_CREATED
        )
def create_customer(customer: schema.CustomerInput,
                    db:   Session = Depends(get_db), 
                    #,auth: bool    = Depends(is_authenticated)
                    ):
    if not crud.get_loyalty_level_count(db, customer.level_id) > 0:
        raise HTTPException(status_code=404, detail=str(customer.level_id) + " is not a valid loyalty level id.")
    
    result = crud.create_customer(db, customer)
    return result

@app.put("/customer/", 
        tags=["Customers"],
        response_model=schema.Customer, 
        summary="Update a single customer",
        response_description="Updated the customer",
        status_code = status.HTTP_200_OK
        )
def update_customer(customer: schema.Customer,
                    db:   Session = Depends(get_db),
                    #,auth: bool    = Depends(is_authenticated)
                    ):
    if not crud.get_loyalty_level_count(db, customer.level_id) > 0:
        raise HTTPException(status_code=404, detail=str(customer.level_id) + " is not a valid loyalty level.")

    result = crud.update_customer(db, customer)    
    if isinstance(result, model.Customer):
        return result
    else:
        if result == 404:
            raise HTTPException(
                status_code=404,
                detail="Could not find a customer with key (customer_id=" + str(customer.customer_id) + ")",
                headers={"X-Error": "Some error goes here"},
            )

@app.delete("/customer/{customer_id}", 
        tags=["Customers"],
        response_model=schema.Customer, 
        summary="Delete a single customer based customer_id - cascading (all associated records will be deleted)",
        response_description="Deleted the customer and all the associated records",
        status_code = status.HTTP_200_OK
        )
def delete_customer(customer_id: int = Path(
                                        ...,
                                        title="Customer ID",
                                        description="Customer unique indetifier",
                                        gt=0
                                        ),
                db:   Session = Depends(get_db)
                ):
    result = crud.delete_customer(db, customer_id)
    
    if isinstance(result, model.Customer):
        return result
    else:
        if result == 404:
            raise HTTPException(
                status_code=404,
                detail="Could not find a customer with key (customer_id=" + str(customer_id) + ")",
                headers={"X-Error": "Some error goes here"},
            )


# -- Purchase --#

@app.get(
    "/purchases",
    tags=["Purchases"],
    response_model=List[schema.Purchase],
    summary="Gets all purchases",
    response_description="A list containing all the purchases"
)
def get_purchases(
        db:   Session = Depends(get_db)
        #,auth: bool    = Depends(is_authenticated)
    ):
    return crud.get_purchases(db)


@app.get(
    "/purchase/{purchase_id}",
    tags=["Purchases"],
    response_model=List[schema.Purchase],
    summary="Gets a single purchase based on purchase_id",
    response_description="A single purchase based on the provided ID",
    responses={404: {"model": None, "description": "Purchase ID not found"}}
)
def get_purchase(purchase_id: int = Path(
                                        ...,
                                        title="Purchase ID",
                                        description="Purchase unique indetifier",
                                        gt=0
                                        ),
                db:   Session = Depends(get_db)
                #,auth: bool    = Depends(is_authenticated)
                ):
    """
    Multiline comment
    """
    result = crud.get_purchase(db, purchase_id)

    if not result:
        return Response(
            'No purchases found',
            media_type="text/plain",
            status_code=HTTP_404_NOT_FOUND
        )
    return result

@app.get(
    "/purchases/{customer_id}",
    tags=["Purchases"],
    response_model=List[schema.Purchase],
    summary="Gets a list of purchases based on customer_id",
    response_description="A list of purchases based on the provided ID",
    responses={404: {"model": None, "description": "Customer ID not found"}}
)
def get_purchase(customer_id: int = Path(
                                        ...,
                                        title="Purchase ID",
                                        description="Purchase unique indetifier",
                                        gt=0
                                        ),
                db:   Session = Depends(get_db)
                #,auth: bool    = Depends(is_authenticated)
                ):
    """
    Multiline comment
    """
    result = crud.get_purchase_based_on_customer_id(db, customer_id)

    if not result:
        return Response(
            'No purchases found',
            media_type="text/plain",
            status_code=HTTP_404_NOT_FOUND
        )
    return result

@app.post("/purchases/", 
        tags=["Purchases"], # a way to group api calls in the docs page
        response_model=schema.Purchase, 
        summary="Create a purchase",
        response_description="Newly created purchase",
        status_code = status.HTTP_201_CREATED
        )
def create_purchase(purchase: schema.PurchaseInput,
                    db:   Session = Depends(get_db), 
                    #,auth: bool    = Depends(is_authenticated),
                    ):
    result = crud.create_purchase(db, purchase)
    
    if isinstance(result, model.Purchase):
        return result
    else:
        if result == 404:
            raise HTTPException(
                status_code=404,
                detail="Integrity constrain violated. Parent key (customer_id=" + str(purchase.customer_id) + ") not found",
                headers={"X-Error": "Some error goes here"},
            ) 

@app.put("/purchase/", 
        tags=["Purchases"],
        response_model=schema.Purchase, 
        summary="Update a single purchase",
        response_description="Updated the purchase",
        status_code = status.HTTP_200_OK
        )
def update_purchase(purchase: schema.Purchase,
                    db:   Session = Depends(get_db),
                    #,auth: bool    = Depends(is_authenticated)
                    ):
    result = crud.update_purchase(db, purchase)
    
    if isinstance(result, model.Purchase):
        return result
    else:
        if result == 404:
            raise HTTPException(
                status_code=404,
                detail="Could not find a purchase card with purchase_id=" + str(purchase.purchase_id) + " and " + "customer_id=" + str(purchase.customer_id) + ")",
                headers={"X-Error": "Some error goes here"},
            )

@app.delete("/purchase/{purchase_id}", 
        tags=["Purchases"],
        response_model=schema.Purchase, 
        summary="Delete a single purchase based on card_id",
        response_description="Deleted a single purchase based on card_id",
        status_code = status.HTTP_200_OK
        )
def delete_purchase(purchase_id: int = Path(
                                        ...,
                                        title="Purchase ID",
                                        description="Purchase unique indetifier",
                                        gt=0
                                        ),
                db:   Session = Depends(get_db)
                #,auth: bool    = Depends(is_authenticated)
                ):
    result = crud.delete_purchase(db, purchase_id)
    
    if isinstance(result, model.Purchase):
        return result
    else:
        if result == 404:
            raise HTTPException(
                status_code=404,
                detail="Could not find a purchase with key (card_id=" + str(purchase_id) + ")",
                headers={"X-Error": "Some error goes here"},
            )

# -- LoyaltyLevel --#

@app.get(
    "/loyalty_levels",
    tags=["LoyaltyLevels"],
    response_model=List[schema.LoyaltyLevel],
    summary="Gets all loyalty levels",
    response_description="A list containing all the loyalty levels"
)
def get_loyalty_levels(
        db:   Session = Depends(get_db)
        #,auth: bool    = Depends(is_authenticated)
    ):
    return crud.get_loyalty_levels(db)

@app.get(
    "/loyalty_level/{level_id}",
    tags=["LoyaltyLevels"], 
    response_model=List[schema.LoyaltyLevel],
    summary="Gets a single loyalty level based on level_id",
    response_description="A single loyalty level based on the provided ID",
    responses={404: {"model": None, "description": "Level ID not found"}}
)
def get_loyalty_level(level_id: str = Path(
                                        ...,
                                        title="Loyalty level ID",
                                        description="Unique loyalty level indetifier",
                                        max_length=2
                                        ),
                db:   Session = Depends(get_db)
                #,auth: bool    = Depends(is_authenticated)
                ):

    result = crud.get_loyalty_level(db, level_id)

    if not result:
        return Response(
            'Loyalty level not found',
            media_type="text/plain",
            status_code=HTTP_404_NOT_FOUND
        )

    return result

@app.post("/loyalty_level/", 
        tags=["LoyaltyLevels"], 
        response_model=schema.LoyaltyLevel, 
        summary="Create a loyalty level",
        response_description="Newly created loyalty level",
        status_code = status.HTTP_201_CREATED
        )
def create_loyalty_level(loyalty_level: schema.LoyaltyLevel,
                    db:   Session = Depends(get_db), 
                    #,auth: bool    = Depends(is_authenticated)
                    ):
        
    result = crud.create_loyalty_level(db, loyalty_level)
    return result

@app.put("/loyalty_level/", 
        tags=["LoyaltyLevels"],
        response_model=schema.LoyaltyLevel, 
        summary="Update a single loyalty level",
        response_description="Updated the loyalty level",
        status_code = status.HTTP_200_OK
        )
def update_loyalty_level(loyalty_level: schema.LoyaltyLevel,
                    db:   Session = Depends(get_db),
                    #,auth: bool    = Depends(is_authenticated)
                    ):
    # if not crud.get_loyalty_level_count(db, loyalty_level.level_id) > 0:
    #     raise HTTPException(status_code=404, detail=str(loyalty_level.level_id) + " is not a valid loyalty level.")

    result = crud.update_loyalty_level(db, loyalty_level)    
    if isinstance(result, model.LoyaltyLevel):
        return result
    else:
        if result == 404:
            raise HTTPException(
                status_code=404,
                detail="Could not find a loyalty level with key (level_id=" + str(loyalty_level.level_id) + ")",
                headers={"X-Error": "Some error goes here"},
            )

@app.delete("/loyalty_level/{level_id}", 
        tags=["LoyaltyLevels"],
        response_model=schema.LoyaltyLevel, 
        summary="Delete a single loyalty level based level_id",
        response_description="Deleted the loyalty level",
        status_code = status.HTTP_200_OK
        )
def delete_loyalty_level(level_id: str = Path(
                                        ...,
                                        title="Loyalty level ID",
                                        description="Unique loyalty level indetifier",
                                        max_length=2
                                        ),
                db:   Session = Depends(get_db)
                #,auth: bool    = Depends(is_authenticated)
                ):
    result = crud.delete_loyalty_level(db, level_id)
    
    if isinstance(result, model.LoyaltyLevel):
        return result
    else:
        if result == 404:
            raise HTTPException(
                status_code=404,
                detail="Could not find a loyalty level with key (level_id=" + str(level_id) + ")",
                headers={"X-Error": "Some error goes here"},
            )


/Users/song/codelearn/FastAPI-Oracle-main/app/database.py

import os, urllib
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.engine.url import URL
#import pyodbc
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

# ORACLE
# connect_url = URL(
#     "oracle+cx_oracle",
#     username=urllib.parse.quote_plus(str(os.environ.get('DB_USERNAME', 'DEFAULT_DB_USERNAME'))),
#     password=urllib.parse.quote_plus(str(os.environ.get('DB_PASSWORD', 'DEFAULT_DB_PASSWORD'))),
#     host=str(os.environ.get('DB_HOST', 'DEFAULT_DB_HOST')),
#     port=str(os.environ.get('DB_PORT', 'DEFAULT_DB_PORT')),
#     database=str(os.environ.get('DB_DATABASE', 'DEFAULT_DB_DATABASE')),
# )


# Azure SQL Server
# connect_url = URL.create(
#     "mssql+pyodbc",
#     username=urllib.parse.quote_plus(str(os.environ.get('DB_USERNAME', 'DEFAULT_DB_USERNAME'))),
#     password=urllib.parse.quote_plus(str(os.environ.get('DB_PASSWORD', 'DEFAULT_DB_PASSWORD'))),
#     host=urllib.parse.quote_plus(str(os.environ.get('DB_HOST', 'DEFAULT_DB_HOST'))),
#     database=str(os.environ.get('DB_DATABASE', 'DEFAULT_DB_DATABASE')),
#     query={
#         "driver": "ODBC Driver 17 for SQL Server"
#     },
# )


# engine = create_engine(connect_url, max_identifier_length=128)

connect_url = 'sqlite:///test.db'
engine = create_engine(connect_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


/Users/song/codelearn/FastAPI-Oracle-main/app/__init__.py


/Users/song/codelearn/FastAPI-Oracle-main/app/model.py

from numbers import Number
from sqlalchemy import Sequence, Boolean, Column, ForeignKey, Integer, String, Date, Float
from sqlalchemy.orm import relationship
from .database import Base, engine

class LoyaltyLevel(Base):
    __tablename__ = "loyalty_level"
    #__table_args__ = {'schema': 'db_schema_name'}
    
    level_id            = Column(String(length=2), primary_key=True)
    description         = Column(String(length=100))
    discount            = Column(Integer)           
    customer            = relationship("Customer", back_populates="loyalty_level")
    
# one to many relationship
class Purchase(Base):
    __tablename__ = "purchase"
#     __table_args__ = {'schema': 'db_schema_name'}
    
    purchase_id         = Column(Integer, Sequence('purchase_id_seq'), primary_key=True)
    customer_id         = Column(Integer, ForeignKey('customer.customer_id', ondelete="CASCADE"), nullable=False)
    purchase_name       = Column(String(length=100))
    purchase_date       = Column(Date) 
    # many-to-one
    customer            = relationship("Customer", back_populates="purchase")

class Customer(Base):
    __tablename__ = "customer"
#     __table_args__ = {'schema': 'db_schema_name'}

    customer_id     = Column(Integer, Sequence('customer_id_seq'), primary_key=True)
    firstname       = Column(String(length=100))
    lastname        = Column(String(length=100))
    date_of_birth   = Column(Date) 
    level_id        = Column(String(length=2), ForeignKey('loyalty_level.level_id'))
    signup_date     = Column(Date) 
    #one-to-one
    loyalty_level   = relationship("LoyaltyLevel", back_populates="customer", uselist=False)
    #one-to-many    
    purchase        = relationship("Purchase", back_populates="customer", cascade="all, delete", passive_deletes=True,)    


/Users/song/codelearn/FastAPI-Oracle-main/app/crud.py

from sqlalchemy.orm import Session
from sqlalchemy import func, or_, exc
from fastapi.encoders import jsonable_encoder
from . import model, schema

# -- Customer --#

def get_customers(db: Session):
    return db.query(model.Customer).all()

def get_customer(db: Session, customer_id: int):
    return db.query(model.Customer).filter(
        model.Customer.customer_id == customer_id
    ).all()

def create_customer(db: Session, customer: schema.CustomerInput):   
    db_item = model.Customer(firstname=customer.firstname, 
                                       lastname=customer.lastname, 
                                       date_of_birth=customer.date_of_birth, 
                                       level_id=customer.level_id,
                                       signup_date=customer.signup_date
                                       )
    # we can also populate the model using a shortcut:  
    # db_item = model.Customer(**customer.dict())                                        
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item
 
def update_customer(db: Session, customer: schema.Customer): 
    existing_customer = db.query(model.Customer).filter(model.Customer.customer_id == customer.customer_id).first()
    if existing_customer:
        db.query(model.Customer).filter(model.Customer.customer_id == existing_customer.customer_id).update(customer.dict())
        db.commit()
        return existing_customer
    else:
        return 404

def delete_customer(db: Session, customer_id: int): 
    existing_customer = db.query(model.Customer).filter(model.Customer.customer_id == customer_id).first()
    if existing_customer:
        db.query(model.Customer).filter(model.Customer.customer_id == customer_id).delete()
        db.commit()
        return existing_customer
    else:
        return 404

# -- LoyaltyLevel --#

def get_loyalty_levels(db: Session):
    return db.query(model.LoyaltyLevel).all()

def get_loyalty_level(db: Session, level_id: int):
    return db.query(model.LoyaltyLevel).filter(
        model.LoyaltyLevel.level_id == level_id
    ).all()

def get_loyalty_level_count(db: Session, level_id: int):
    return db.query(model.LoyaltyLevel).filter(
        model.LoyaltyLevel.level_id == level_id
    ).count()

def create_loyalty_level(db: Session, loyalty_level: schema.LoyaltyLevel):   
    db_item = model.LoyaltyLevel(**loyalty_level.dict())                                  
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

def update_loyalty_level(db: Session, loyalty_level: schema.LoyaltyLevel): 
    existing_loyalty_level = db.query(model.LoyaltyLevel).filter(model.LoyaltyLevel.level_id == loyalty_level.level_id).first()
    if existing_loyalty_level:
        db.query(model.LoyaltyLevel).filter(model.LoyaltyLevel.level_id == existing_loyalty_level.level_id).update(loyalty_level.dict())
        db.commit()
        return existing_loyalty_level
    else:
        return 404

def delete_loyalty_level(db: Session, level_id: int): 
    existing_loyalty_level = db.query(model.LoyaltyLevel).filter(model.LoyaltyLevel.level_id == level_id).first()
    if existing_loyalty_level:
        db.query(model.LoyaltyLevel).filter(model.LoyaltyLevel.level_id == level_id).delete()
        db.commit()
        return existing_loyalty_level
    else:
        return 404

# -- Purchase --#

def get_purchases(db: Session):
    return db.query(model.Purchase).all()

def get_purchase(db: Session, purchase_id: int):
    return db.query(model.Purchase).filter(
        model.Purchase.purchase_id == purchase_id
    ).all()

def get_purchase_based_on_customer_id(db: Session, customer_id: int):
    return db.query(model.Purchase).filter(
        model.Purchase.customer_id == customer_id
    ).all()

def create_purchase(db: Session, purchase: schema.PurchaseInput):
    db_item = model.Purchase(**purchase.dict())    
    try:
        db.add(db_item)
        db.commit()
        db.refresh(db_item)
        return db_item
    except exc.IntegrityError:
        db.rollback()
        return 404

def update_purchase(db: Session, purchase: schema.Purchase): 
    existing_purchase = db.query(model.Purchase).filter(model.Purchase.customer_id == purchase.customer_id, model.Purchase.purchase_id == purchase.purchase_id).first()
    if existing_purchase:
        db.query(model.Purchase).filter(model.Purchase.purchase_id == existing_purchase.purchase_id).update(purchase.dict())
        db.commit()
        return existing_purchase
    else:
        return 404

def delete_purchase(db: Session, purchase_id: int): 
    existing_purchase = db.query(model.Purchase).filter(model.Purchase.purchase_id == purchase_id).first()
    if existing_purchase:
        db.query(model.Purchase).filter(model.Purchase.purchase_id == existing_purchase.purchase_id).delete()
        db.commit()
        return existing_purchase
    else:
        return 404

  

/Users/song/codelearn/FastAPI-Oracle-main/app/schema.py

from ast import LtE, Num
from typing import List, Optional
from unicodedata import numeric
from fastapi import Query
from pydantic import BaseModel
from datetime import date

class LoyaltyLevel(BaseModel):

    level_id: str = Query(
        ...,
        title="Loyalty level ID",
        description="Loyalty level ID",
        max_length=2,
    )
    description: Optional[str] = Query(
        None,
        title="Loyalty level description",
        description="The description of the Loyalty level",
        max_length=100,
    )
    discount: Optional[int] = Query(
        0,
        title="Loyalty discount percentage",
        description="Loyalty discount percentage",
        lte=100
    )

    class Config:
        orm_mode = True

class Customer(BaseModel):
    customer_id: int = Query(                                         
        ...,                                                                    
        title="Customer ID",                                                    
        description="The ID of the customer",                                 
        gt=0,

    )
    firstname: Optional[str] = Query(
        None,
        title="Customer's first name",
        description="The first name of the customer",
        max_length=100,
    )
    lastname: Optional[str] = Query(
        None,
        title="Customer's last name",
        description="The last name of the customer",
        max_length=100,
    )
    date_of_birth: Optional[date] = Query(
        None,
        title="Date of birth",
        description="Customer's date of birth",
    )
    level_id: str = Query(
        ...,
        title="Loyalty level ID",
        description="Loyalty level ID",
        max_length=2,
    )
    signup_date: Optional[date] = Query(
        None,
        title="Sign up date",
        description="Customer's sign up date",
    )
        
    class Config:
        orm_mode = True
    
# a copy of the Customer model but without the customer_id. We create this for POST requests validation
# this is done since we don't need to specify a the key (customer_id) whern creating a customer (since it's a sequence)
class CustomerInput(BaseModel):
    firstname: Optional[str] = Query(
        None,
        title="Customer's first name",
        description="The first name of the customer",
        max_length=100,
    )
    lastname: Optional[str] = Query(
        None,
        title="Customer's last name",
        description="The last name of the customer",
        max_length=100,
    )
    date_of_birth: Optional[date] = Query(
        None,
        title="Date of birth",
        description="Customer's date of birth",
    )
    level_id: str = Query(
        ...,
        title="Loyalty level ID",
        description="Loyalty level ID",
        max_length=2,
    )
    signup_date: Optional[date] = Query(
        None,
        title="Sign up date",
        description="Customer's sign up date",
    )

    class Config:
        orm_mode = True

class Purchase(BaseModel):
    purchase_id: int = Query(                                         
        ...,                                                                    
        title="Purchase ID",                                                    
        description="The ID of the purchase",                                 
        gt=0,
    )
    customer_id: int = Query(
        ...,
        title="Customer ID FK",
        description="Customer ID FK",
        gt=0,
    )
    purchase_name: Optional[str] = Query(
        None,
        title="Purchase name",
        description="The name of the purchase",
        max_length=100,
    )
    purchase_date: Optional[date] = Query(
        None,
        title="Purchase date",
        description="The date of the purchase",
    )
    
    class Config:
        orm_mode = True

class PurchaseInput(BaseModel):
    customer_id: int = Query(
        ...,
        title="Customer ID FK",
        description="Customer ID FK",
        gt=0,
    )
    purchase_name: Optional[str] = Query(
        None,
        title="Purchase name",
        description="The name of the purchase",
        max_length=100,
    )
    purchase_date: Optional[date] = Query(
        None,
        title="Purchase date",
        description="The date of the purchase",
    )
    
    class Config:
        orm_mode = True
        

FastAPI Oracle CRUD application

Introduction

This is a simple Python FastAPI test application that works with an Oracle database.

Summary

The goal of this project was to learn a bit more about FastAPI, SQLAlchemy, Oracle Database and Docker.
Once 'HelloWorld' started working, I wanted to experiment with something a bit more complex. Specifically, CRUD API which handles Foreign Keys, One-To-One and One-To-Many cascading relationships using SQLAlchemy and Oracle database behind it. Finally, I wanted to run the whole thing in a Docker.

Database diagram

Database Diagram

Create database tables

The app will automatically create all 3 tables with sample data in the database on startup and will automatically drop them on shutdown

Installing and running in Docker

Tested on OS X 12.2

Asumptions

  • Python 3.8 or higher is installed
  • Docker is installed

After checking out the repo, in terminal, cd to the project root directory

Build Docker image using the Dockerfile.oracle file

You will need to provide db_username, db_password, db_host, db_port and db_database as arguments when building the Docker

docker build -f ./Dockerfile.oracle -t fastapi-oracle-image . \
--build-arg DB_USERNAME="<db_username>" \
--build-arg DB_PASSWORD="<db_password>" \
--build-arg DB_HOST="<db_host>" \
--build-arg DB_PORT="<db_port>" \
--build-arg DB_DATABASE="<db_database>"

Running the server instance in Docker

Run the newly created Docker image in a fastapi-oracle-container container

docker run -d --name fastapi-oracle-container -p 80:80 fastapi-oracle-image

Goto http://0.0.0.0:80/docs

To stop the container

docker stop fastapi-oracle-container

Installing and running in VirtualEnv (for development)

Tested on OS X 12.2

Asumptions

https://www.oracle.com/database/technologies/instant-client/downloads.html

After checking out the repo, in terminal, cd to the project root directory

Create virtual environment for the app

python3 -m venv .

Activate the virtual environment

source ./bin/activate

Install required packages from properties.txt

pip install --no-cache-dir --upgrade -r ./requirements.txt

Set database connection configuration

Rename the template.env file to .env

mv template.env .env

Edit the .env file in the project folder

emacs .env

By filling in the following variables:

DB_USERNAME=""
DB_PASSWORD=""
DB_HOST=""
DB_PORT=
DB_DATABASE=""

Running the server instance in VirtualEnv

Start the app

uvicorn main:app --reload

Goto http://127.0.0.1:8000/docs