这一篇主要实现用户注册和登录
编写接口并设置URL
main.py添加注册表单参数和简单注册接口
from fastapi import FastAPI from pydantic import BaseModel import models from db import engine, SessionLocal from fastapi.responses import JSONResponse models.Base.metadata.create_all(bind=engine) app = FastAPI() class RegisterForm(BaseModel): username: str password: str @app.post("/register") def register(user: RegisterForm): if SessionLocal().query(models.User).filter_by(username=user.username).count() > 0: return JSONResponse({ "code": 400, "msg": "用户已存在" }) db_user = models.User(username=user.username, password=user.password) with SessionLocal.begin() as session: session.add(db_user) return JSONResponse({ "code": 0, "msg": "注册成功" })
添加登录表单参数,编写简单登录接口
class LoginForm(BaseModel): username: str password: str @app.post("/login") def login(user:LoginForm): user = SessionLocal().query(models.User).filter_by(username = user.username).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) response = JSONResponse({ "code": 0, "msg": "登录成功" }) return response
之后使用JWT令牌(Token)和安全密码哈希(Hash)实现真正的安全机制
安装 python-jose和PassLib
pip install python-jose
pip install passlib
pip install bcrypt
编写auth.py,加入三个方法,分别是加密密码,校验加密密码,生成token
from jose import jwt from passlib.context import CryptContext SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_access_token(data: dict): to_encode = data.copy() encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
注册登录接口加上这些功能
@app.post("/register") def register(user: RegisterForm): if SessionLocal().query(models.User).filter_by(username=user.username).count() > 0: return JSONResponse({ "code": 400, "msg": "用户已存在" }) db_user = models.User(username=user.username, password=get_password_hash(user.password)) with SessionLocal.begin() as session: session.add(db_user) return JSONResponse({ "code": 0, "msg": "注册成功" }) class LoginForm(BaseModel): username: str password: str @app.post("/login") def login(form:LoginForm): user = SessionLocal().query(models.User).filter_by(username = form.username).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) if not verify_password(form.password, user.password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token( data={"sub": user.username} ) response = JSONResponse({ "code": 0, "msg": "登录成功", "access_token": access_token }) return response
auth.py编写通过JWTtoken获取当前用户
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") def get_current_user(token: str = Depends(oauth2_scheme)): payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") user = SessionLocal().query(models.User).filter_by(username = username).first() return user
新增用户信息接口验证逻辑
class UserRes(BaseModel): username: str nickname: str | None = None email: str | None = None isadmin: bool | None = False @app.get("/users/info", response_model=UserRes) async def read_users_me(current_user: UserRes = Depends(get_current_user)): return current_user
这样用户的注册和登录基本实现,可以进行更多后端接口逻辑的编写了。