flask-sqlalchemy【3.1.1】使用 Restfui风格

发布时间 2023-12-11 22:16:50作者: __username

1.安装导入

from flask_sqlalchemy import SQLAlchemy

注册

# 创建拓展插件实例
db = SQLAlchemy()
app = Flask(__name__)
app.secret_key = 'mysecretkey123'
# 配置 SQLite 数据库, 默认存放在 app instance 文件夹下
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
# 讲拓展插件对象绑定到程序实例
db.init_app(app)

增删改查/分页/等操作

局部

# get /user
# @app.get("/user")
# def user_list():
#     q = db.select(User).order_by(User.id)
#     users = db.session.execute(q).scalars()
#     return {"message": "ok", "data": [user.json() for user in users]}



# get /user   [分页]   /user?page=1&per_page=10
@app.get("/user")
def user_list():
    q = db.select(User).order_by(User.id)
    page = request.args.get("page", type=int, default=1)
    per_page = request.args.get("per_page", type=int, default=10)
    paginate = db.paginate(q, page=page, per_page=per_page)
    # print(paginate.items)
    return {"message": "ok", "data": [user.json() for user in paginate.items]}


# post /user  {"username": "全栈编程", "email": "123156@qq.com"}
@app.post("/user")
def create_user():
    data = request.json
    user = User(username=data["username"], email=data["email"])
    db.session.add(user)
    db.session.commit()
    return {"message": "ok", "data": user.json()}



# get /user/1
@app.get("/user/<int:uid>")
def user_detail(uid):
    user = db.get_or_404(User, uid)
    return {"message": "ok", "data": user.json()}


# delete /user/1
@app.delete("/user/<int:uid>")
def user_delete(uid):
    user = db.get_or_404(User, uid)
    db.session.delete(user)
    db.session.commit()
    return {"message": "ok", "data": user.json()}


# put /user/2  {"id": 2, "username": "梁文", "email": ""}
@app.put("/user/<int:uid>")
def change_user(uid):
    user = db.get_or_404(User, uid)
    data = request.json
    user.username = data.get("username")
    user.email = data.get("email")
    db.session.commit()
    return {"message": "ok", "data": user.json()}



# with app.app_context():
#     db.create_all()
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')

完整

import os
from flask import Flask, render_template, request, redirect, url_for, session, flash
from markupsafe import escape
from tools.data_faker import generate_movie_data
from flask_sqlalchemy import SQLAlchemy

# 创建拓展插件实例
db = SQLAlchemy()
app = Flask(__name__)
app.secret_key = 'mysecretkey123'
# 配置 SQLite 数据库, 默认存放在 app instance 文件夹下
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///project.db"
# 讲拓展插件对象绑定到程序实例
db.init_app(app)


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String, unique=True, nullable=False)
    email = db.Column(db.String)

    def json(self):
        return {
            "id": self.id,
            "username": self.username,
            "email": self.email
        }

@app.route('/')
def index():
    user_info = session.get('user_info')
    if not user_info:
        return redirect(url_for('login'))

    friends = [
        {'username': 'John', 'age': 22},
        {'username': 'Mary', 'age': 21},
        {'username': 'Bob', 'age': 23},
    ]

    return render_template('index.html', **{'user_info': user_info, 'friends': friends})


@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        user_info = {
            'username': request.form['username'],
            'password': request.form['password']
        }
        if user_info['username'] == 'admin' and user_info['password'] == '123':
            session['user_info'] = user_info
            return redirect(url_for('index'))
        else:
            flash('username or password is incorrect')
    return render_template('login.html')


@app.route('/user/<username>')
def user(username):
    user_info = session.get('user_info')
    if not user_info:
        return redirect(url_for('login'))
    username = escape(username)
    name = username
    movies = [generate_movie_data() for _ in range(30)]
    return render_template('user.html', **{'user_info': user_info, 'name': name, 'movies': movies})


@app.route('/logout')
def logout():
    session.pop('user_info', None)
    return redirect(url_for('login'))


# get /user
# @app.get("/user")
# def user_list():
#     q = db.select(User).order_by(User.id)
#     users = db.session.execute(q).scalars()
#     return {"message": "ok", "data": [user.json() for user in users]}



# get /user   [分页]   /user?page=1&per_page=10
@app.get("/user")
def user_list():
    q = db.select(User).order_by(User.id)
    page = request.args.get("page", type=int, default=1)
    per_page = request.args.get("per_page", type=int, default=10)
    paginate = db.paginate(q, page=page, per_page=per_page)
    # print(paginate.items)
    return {"message": "ok", "data": [user.json() for user in paginate.items]}


# post /user  {"username": "全栈编程", "email": "123156@qq.com"}
@app.post("/user")
def create_user():
    data = request.json
    user = User(username=data["username"], email=data["email"])
    db.session.add(user)
    db.session.commit()
    return {"message": "ok", "data": user.json()}



# get /user/1
@app.get("/user/<int:uid>")
def user_detail(uid):
    user = db.get_or_404(User, uid)
    return {"message": "ok", "data": user.json()}


# delete /user/1
@app.delete("/user/<int:uid>")
def user_delete(uid):
    user = db.get_or_404(User, uid)
    db.session.delete(user)
    db.session.commit()
    return {"message": "ok", "data": user.json()}


# put /user/2  {"id": 2, "username": "梁文", "email": ""}
@app.put("/user/<int:uid>")
def change_user(uid):
    user = db.get_or_404(User, uid)
    data = request.json
    user.username = data.get("username")
    user.email = data.get("email")
    db.session.commit()
    return {"message": "ok", "data": user.json()}



# with app.app_context():
#     db.create_all()
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')



# 参考教程:https://www.bilibili.com/video/BV1Hh4y1j7jM/?spm_id_from=333.1007.top_right_bar_window_history.content.click&vd_source=5fed6e8a7e3ad9f10860bf7a4540ba71
# 参考教程:https://notes.zhengxinonly.com/posts/flask_sqlalchemy_3.0.html

测试数据

import requests
from faker import Faker  # faker制作假数据

with requests.get('http://192.168.1.104:5000/user?page=2&per_page=10') as response:
    print(response.json())

faker = Faker('zh-CN')

# for i in range(90):
#     user = {
#         'username': faker.name(),
#         'email': faker.email()
#     }
#     # print(user)
#     with requests.post('http://192.168.1.104:5000/user', json=user) as response:
#         print(response.json())


# with requests.get('http://192.168.1.104:5000/user/64') as response:
#     print(response.json())


# with requests.delete('http://192.168.1.104:5000/user/1') as response:
#     # print(response.json())
#     print(response.text)


# user = {"id": 2, "username": "梁文", "email": "asdasda@qq.com"}
#
# with requests.put('http://192.168.1.104:5000/user/2', json=user) as response:
#     # print(response.json())
#     print(response.text)


如图