flask,蓝图使用,g对象,数据库连接池

发布时间 2023-04-04 19:00:00作者: clever-cat

内容回顾

cbv使用

写一个类继承MethodView写get,post…

类属性decorators=[auth]可以加装饰器

CBV执行流程

跟django的执行流程一样

endpoint作用 路径别名

add_url_rule(view_func=IndexView.as_view(‘index’))

为什么endpoint不传,是被路由装饰器装饰的函数名:函数名._name_

装饰器的执行先后顺序,从上到下执行

模板语法

使用的是jinja2更加强大,兼容django的dtl模板语法

请求响应

请求:全局request对象,在不同视图函数中尽管使用,不会错乱

method,path,files,form

响应四件套:字符串,render_template,redirect,jsonify,响应对象,make_response包裹一下四件套之一,

设置cookie,set_cookies

响应对象.headers 响应头

session使用

设置密钥

全局导入,直接赋值使用,取值

session执行流程

请求来了走open_session前端携带cookie到后端,后端取出来cookie对于的value值,解密,转到session对象中,后续在视图函数中,使用sesson即可

save_session:请求走的时候,校验session有没有被改过了,如果改过了,删除cookie重新设置cookie

session用起来像字典,如何做,一个对象可以像字典一样使用,_getitem_,__setitem__只要调用_setitem__就说明了,对象属性modify,一开始false只要触发_setitem,置为true,后期只要判断modify,就可以判断session有没有被改过

闪现

跨请求获取到之前请求存放的数据,取一次就没了,关注django中的message框架

放值:flash('%s,我错了'%name)
取值:get_flashed_messages()

请求扩展

before_request 请求来的时候

after_request 请求走的使用

before_first_request 项目启动后第一个请求来触发

teardown_request:错误日志记录

errorhandler:某种状体码,就会执行,http的状态码1xx,2xx,3xx,4xx,5xx

内容详情

蓝图的使用

blueprint翻译过来的,称之为蓝图

作用是:之前全在一个py中写flask项目,后期肯定要划分目录

不用蓝图划分目录

no_blueprint_flask  # 项目名
    src             #核心源码位置
    	__init__.py # 包 里面实例化得到了app对象,
    	models.py   #放表模型
    	views.py    # 放视图函数
    static          # 放静态资源
    templates       # 放模板
    	home.html   # 模板
    manage.py       # 启动文件

蓝图的使用步骤

第一步:导入蓝图类

from flask import Blueprint

第二步:实例化得到蓝图对象

us=Blueprint('user',__name__)

第三步:在app中注册蓝图

app.register_blueprint(us)

第四步:在不同的views.py使用蓝图注册路由

@us.route('/login')

补充:蓝图可以有自己的静态文件和模板

补充:注册蓝图时,可以使用前缀,必须以/开头

使用蓝图,划分小型目录

little_blueprint              # 项目名
    -src                      # 核心代码
        -static               # 静态文件
            -1.jpg            # 图片
        -templates            # 模板文件
            -user.html            # 模板
        -views                # 视图函数存放位置
            -order.py         # 订单相关视图
            -user.py          # 用户相关视图
        -__init__.py          # 包
        -models.py            # 表模型
    -manage.py                # 启动文件

使用蓝图,划分大型项目目录,多个app,像django一样

big_blueprint  								# 项目名
    -src									# 核心文件
        -admin								# admin的app
        	-static							# 静态文件
        		-1.jpg						# 图片
        	-templates						# 模板文件目录
        		-admin_home.html			# 模板文件
        	-__init__.py					# 包
        	-models.py						# 表模型
        	-views.py						# 视图函数
        -home								# home app
        -order								# orderapp
        -__init__.py						# 包
        -settings.py						# 配置文件
    -manage.py								# 启动文件

g对象

g 对象 是什么

global的缩写,再python中是个关键字,不能以关键字作为变量名,干脆用了g

g 对象,再整个请求的全局,可以放值,可以取值

全局变量,再任意位置导入使用即可

他为什么不学django使用request作为上下文

因为使用request,可能会造成request数据的污染,不小心改了request的属性,但我们可能不知道

建议使用g 是空的放入之后再当次请求中全局优先

以后想在当次请求中,放入一些数据,后面使用,就可以使用g对象

g和session有什么区别

g 是只针对于当次请求

session针对于多次请求

from flask import Flask, g, request

app = Flask(__name__)
app.debug = True


@app.before_request
def before():
    if 'home' in request.path:
        g.xx = 'xx'


def add(a, b):
    # print('---',g.name)
    print('---', request.name)
    return a + b


@app.route('/')
def index():
    print(g.xx)
    name = request.args.get('name')
    # g.name = name
    request.method = name
    res = add(1, 2)
    print(res)
    return 'index'


@app.route('/home')
def home():
    print(g.xx)
    return 'index'


if __name__ == '__main__':
    app.run()

数据库连接池

flask操作pymysql

使用pymysql

再视图函数中,创建pymysql的连接,查数据,查完,返回给前端

有什么问题?来一个请求,创建一个连接,请求结束连接关闭(django就是这么做的)

把连接对象,做成全局的,再视图函数中,使用全局的连接,查询,返回给前端

有什么问题?会出现数据错乱

image-20230404120516198

测试全局连接结果数据错乱问题

import pymysql
import threading
import time

conn = pymysql.connect(user='root',
                       password="root",
                       host='127.0.0.1',
                       database='luffy01',
                       port=3306)

cursor = conn.cursor(pymysql.cursors.DictCursor)


def task_1():
    cursor.execute('select * from luffy_user')
    time.sleep(3)
    res = cursor.fetchall()
    print(res, "这是user表的数据")


def task_2():
    time.sleep(1)
    cursor.execute('select * from luffy_banner')
    time.sleep(5)
    res = cursor.fetchall()
    print(res, '这是banner表的数据')


if __name__ == '__main__':
    t1 = threading.Thread(target=task_1)
    t2 = threading.Thread(target=task_2)
    t1.start()
    t2.start()

image-20230404185228226

解决上面的两个问题

数据库连接池

创建一个全局的池

每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数

使用第三方数据库连接池,使用步骤

安装pip install dbutils

使用:实例化的到一个池对象

在视图函数中导入使用

from dbutils.pooled_db import PooledDB
import pymysql

pool = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=10,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,
    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='root',
    database='luffy01',
    charset='utf8'
)

conn = pool.connection()

cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute('select * from luffy_banner limit 2')
res = cursor.fetchall()
print(res)

带池的代码

@app.route('/article_pool')
def article_pool():
    conn = pool.connection()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute('select id,title,author_img from aritcle limit 2')
    res = cursor.fetchall()
    print(res)
    return jsonify(res)

不带池的代码

@app.route('/article')
def article():
    conn = pymysql.connect(user='root',
                           password="",
                           host='127.0.0.1',
                           database='cnblogs',
                           port=3306)
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    time.sleep(random.randint(1,3))
    cursor.execute('select id,title,author_img from aritcle limit 2')
    res = cursor.fetchall()
    cursor.close()
    conn.close()
    return jsonify(res)

压力测试

from threading import Thread
import requests


def task():
    res = requests.get('http://127.0.0.1:5000/article_pool')
    print(len(res.text))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=task)
        t.start()

效果是池的连接数明显小

不使用池连接数明显大

查看数据连接数

show status like 'Threads%'