请求扩展,蓝图 , flask-session,数据库连接池,wtforms ,flask-script,信号,flask-cache

发布时间 2023-11-20 17:49:49作者: 毓见

1 请求扩展?

#1  before_request:请求来了会走,依次从上往下执行,但是如果其中一个返回了 响应对象 ,后续的就不走了,视图函数也不走而来
#2  after_arequest:请求走了,会从下往上依次执行,它必须返回 响应对象

假设 :
	写了3个before_request    第二个返回了  响应对象
    写了3个after_arequest    所有的after_arequest都会执行,从下往上
    
# 使用场景:
	-before_request 做一些拦截,判断
    	比如校验请求头中是否带token,user-agent。。。
    -after_arequest 在响应中加入
    	-响应头中
        -响应cookie
        -修改响应体。。。。

#3  before_first_request  老版本:新版本无了   项目启动后,第一次,以后再也不执行了


# 4 teardown_request
每一个请求之后绑定一个函数,即使遇到了异常
@app.teardown_request
def lqz(err):
    print(err) # 如果视图函数正常顺利运行,err是None的,如果视图函数出错了,err就是错误对象,一般用来做日志记录
    print('teardown_request')
    
    
    
# 5 errorhandler
路径不存在时404,服务器内部错误500

@app.errorhandler(404)  # 路径不存在
def error(arg):
    print('xxx')
    print(arg)
    return render_template('404.html')


@app.errorhandler(500)  # 服务器内部错误
def error(arg):
    return render_template('500.html')



# 6 template_global
#标签

@app.template_global()
def sb(a1, a2):
    return a1 + a2
#{{sb(1,2)}}
#7 template_filter
# 过滤器

@app.template_filter()
def db(a1, a2, a3):
    return a1 + a2 + a3
#{{ 1|db(2,3)}}

2 蓝图 ?

# blurprint 翻译过来的:flask是要做成项目,放到多个文件中,使用蓝图划分目录


# 不使用蓝图划分目录
如下图

### 使用蓝图,划分目录
	-小型项目
    -大型项目

# 蓝图使用步骤:
	1 在不同的视图函数中初始化蓝图
    	bp_user = Blueprint('user', __name__)
    2 以后注册路由,写请求扩展,统一都用蓝图做(局部的,如果使用app写请求扩展,这是全局的)
        @bp_user.before_request
        def before():
            print('user的---》before')
        @bp_user.route('/login')
        def login():
    3 在app中注册蓝图
    	app.register_blueprint(bp_home)


3 flask-session ?

# 之前用过flask的session---》加密后,放到了cookie中
# 我们想把session放到redis中 ---》django的session放到djagno-session表中

# 借助于第三方:flask-session  可以放在数据库中,文件中,redis中 以redis为例
pip install flask-session

##### 方式一
# from flask_session import RedisSessionInterface
# import redis
# conn = redis.Redis(host='127.0.0.1', port=6379)
# app.session_interface = RedisSessionInterface(conn, 'session')

##### 方式二:(继承第三方 ,通用方案:第三方提供的一个类,把app包裹一下,这个第三方就能用了)
from flask_session import Session
from redis import Redis

app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='127.0.0.1', port='6379')
# app.config['SESSION_KEY_PREFIX'] = 'lqz'  # 如果不写,默认以:SESSION_COOKIE_NAME 作为key
# app.config.from_pyfile('./settings.py')
Session(app)  # 核心跟第一种方式一模一样





# 两个点
	- session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:')
    
    - session的key理应该是 uuid,但是咱们还是三段式--》之前浏览器器中存在了

4 数据库连接池 ?

4.1 flask使用pymysql

@app.route('/article')
def article():
    import pymysql
    from pymysql.cursors import DictCursor
    conn = pymysql.connect(user='root',
                           password="lqz123?",
                           host='127.0.0.1',
                           database='cnblogs',
                           port=3306)
    cursor = conn.cursor(DictCursor)
    # 获取10条文章
    sql = 'select id,name,url from article limit 10'
    cursor.execute(sql)
    # 切换
    res = cursor.fetchall()
    print(res)
    return jsonify({'code': 100, 'msg': '成功', 'result': res})
# 存在的问题
1 原生pymysql操作,最好有个orm----》后面  ---》sqlalchemy
2 并发问题:conn和cursor 要做成单例,还是每个视图函数一个?
    -如果用单例,数据会错乱
    -咱们需要,每个视图函数,拿一个连接---》如果并发数过多,mysql连接数就很多---》使用连接池解决

        
# 解决上面的两个问题
	-数据库连接池
    -创建一个全局的池
    -每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数
    
    
    
# 使用第三方数据库连接池,使用步骤
	-1 安装 pip install dbutils
    -2 使用:实例化得到一个池对象---》池是单例
        from dbutils.pooled_db import PooledDB
        import pymysql
        POOL = PooledDB(
                creator=pymysql,  # 使用链接数据库的模块
                maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
                mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
                maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
                maxshared=3,
                # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
                blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
                setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
                ping=0,
                # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                host='127.0.0.1',
                port=3306,
                user='root',
                password='lqz123?',
                database='cnblogs',
                charset='utf8'
            )
	-3 在视图函数中导入使用
	@app.route('/article')
    def article():
        conn = POOL.connection()

        cursor = conn.cursor(DictCursor)
        # 获取10条文章
        sql = 'select id,name,url from article limit 10'
        cursor.execute(sql)
        time.sleep(1)
        # 切换
        res = cursor.fetchall()
        print(res)
        return jsonify({'code': 100, 'msg': '成功', 'result': res})
        
        
####  池版
@app.route('/article')
def article():
    conn = POOL.connection()

    cursor = conn.cursor(DictCursor)
    # 获取10条文章
    sql = 'select id,name,url from article limit 10'
    cursor.execute(sql)
    time.sleep(1)
    # 切换
    res = cursor.fetchall()
    print(res)
    return jsonify({'code': 100, 'msg': '成功', 'result': res})
###  非池版
@app.route('/article')
def article():
    import pymysql
    from pymysql.cursors import DictCursor
    conn = pymysql.connect(user='root',
                           password="lqz123?",
                           host='127.0.0.1',
                           database='cnblogs',
                           port=3306)
    cursor = conn.cursor(DictCursor)
    # 获取10条文章
    sql = 'select id,name,url from article limit 10'
    cursor.execute(sql)
    # 切换
    time.sleep(2)
    res = cursor.fetchall()
    print(res)
    return jsonify({'code': 100, 'msg': '成功', 'result': res})

###  压测代码  jmeter工具---》java  
import requests
from threading import Thread


# 没有连接池
def task():
    # res = requests.get('http://127.0.0.1:8888/article')
    res = requests.get('http://127.0.0.1:8889/article')
    print(res.json())


if __name__ == '__main__':
    l = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        l.append(t)

    for i in l:
        i.join()


## 效果是:
	使用池的连接数明显小
    不使用池连接数明显很大
    
    
# 查看数据库连接数
show status like %Threads%;

# django 的 mysql操作,有连接池?
	-没有---》一个请求--》就是一个新的连接
    
    -django中引入连接池---》自行搜索
    
# flask 后期,使用sqlalchemy,自带连接池---》这个不需要咱们处理了

5 wtforms ?

# 生成表单,做数据校验
# django---》forms组件--》django内置的
	1 写个类,继承Form,在里面写字段
    2 requset.POST 要校验的数据  form=MyForm(request.Post)   form.is_valiad()
    3 可以在模板上,快速生成form表单
    
    
# wtform就是用来做表单校验,和生成表单
	

6 flask-script---》新版本不用了 ?

# flask 老版本中,没有命令运行项目,自定制命令

# flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令

# 新版的flask--》官方支持定制命令  click 定制命令,这个模块就弃用了



# flask-migrate 老版本基于flask-script,新版本基于flask-click写的

### 使用步骤
	-1 pip3 install  Flask-Script==2.0.3
    -2 pip3 install flask==1.1.4
    -3 pip3 install markupsafe=1.1.1
	-4 使用
    from flask_script import Manager
    manager = Manager(app)
    if __name__ == '__main__':
    	manager.run()
    -5 自定制命令
    @manager.command
    def custom(arg):
        """自定义命令
        python manage.py custom 123
        """
        print(arg)
        
    - 6 执行自定制命令
    python manage.py custom 123

6.1 具体使用


### 如果运行不了:报错 from flask._compat import text_type,降版本
# -1 pip3 install  Flask-Script==2.0.3
# -2 pip3 install flask==1.1.4
# -3 pip3 install markupsafe=1.1.1


from flask import Flask
from flask_script import Manager

app = Flask(__name__)
app.debug = True
manager = Manager(app)

####3 自定制命令
@manager.command
def custom(arg):
    """自定义命令
    python manage.py custom 123
    """
    print(arg)


@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令(-n也可以写成--name)
    执行: python manage.py  cmd -n lqz -u xxxx
    执行: python manage.py  cmd --name lqz --url xxx
    """
    print(name, url)

# 比如定制一个命令---》导入初始化的省市的数据
# 比如定制一个命令---》传入一个excel---》把excel的数据同步到mysql表中---》openpyxl

@app.route('/')
def index():
    return 'index'


if __name__ == '__main__':
    # app.run(port=8888)
    manager.run()

6.2 新版本启用了(基于click)

# flask 自己有,基于click
	-运行flask    flask --app py文件名字:app run
        
        
##  1 flask 运行项目
flask --app py文件名字:app run

## 2 定制命令
@app.cli.command("create-user")
@click.argument("name")
def create_user(name):
    print(name)
    
# 命令行中执行
flask --app 7-flask命令:app create-user lqz
# 简写成 前提条件是 app所在的py文件名字叫 app.py
flask create-user lqz

6.3 django中自定制命令

# 1 app下新建文件夹
	management/commands/
# 2 在该文件夹下新建py文件,随便命名(命令名)

# 3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):
    help = '命令提示'
    def handle(self, *args, **kwargs):
		命令逻辑  
# 4 使用命令
python manage.py  py文件(命令名)

7 信号 ?

# https://flask.palletsprojects.com/en/3.0.x/signals/

# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

# 内置信号
	request_started = _signals.signal('request-started')                # 请求到来前执行
    request_finished = _signals.signal('request-finished')              # 请求结束后执行

    before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
    template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行

    got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行

    request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
    appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)

    appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
    appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
    message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发
    
    
# 通过内置信号,可以实现跟请求扩展类似的功能,但是他们是不同的方案,信号更丰富

signal:信号--》flask中的信号,django中也有


Semaphore :信号量,多把锁,https://zhuanlan.zhihu.com/p/489305763
import threading
def task(semaphore):
    # 请求访问资源
    semaphore.acquire()
    try:
        # 访问共享资源
        print("Accessing shared resource")
    finally:
        # 释放资源
        semaphore.release()

# 创建Semaphore对象
semaphore = threading.Semaphore(5)

# 创建50个线程
threads = [threading.Thread(target=access_resource, args=(semaphore,)) for _ in range(50)]

# 启动线程
for thread in threads:
    thread.start()

# 等待所有线程结束
for thread in threads:
    thread.join()

8.1 内置信号使用

# 内置信号机制实现
# 第一步:写一个函数
def render_logger(*args, **kwargs):
    print(args)
    print(kwargs)
    # kwargs.get('template')
    app.logger.info('模板渲染了')


# 第二步:跟内置信号绑定
signals.template_rendered.connect(render_logger)


# 第三步:正常操作,等信号触发

8.2 自定义信号

#### 自定义信号
# 第0步:定义自定义信号
from flask.signals import _signals

print_args = _signals.signal('print_args')


# 第二步:写个函数
def lqz(*args, **kwargs):
    print(args)
    print(kwargs)
    print('我执行了')


# 第三步:绑定信号
print_args.connect(lqz)


# 第四步:触发信号
@app.route('/home')
def home():
    print('xxxxxxxxx')
    # 第四步:触发信号
    print_args.send(value='xxxxxxxxx')
    return 'home'

8.3 信号的实际用途

# 1 新增一个用户,给管理员发条邮件
# 2 banner表中插入数据,就删除缓存
# 3 只要mysql增加一条记录,就把数据同步到 es中


8.4 django中如何使用信号

# banner表中插入数据,就删除缓存  伪代码

#1  定义一个函数
from django.db.models.signals import pre_save
def callBack(*args **kwargs):
    print(args)
    print(kwargs)
    instance=kwargs.get('instance') # 新增的对象
    # 通过对象,判断是增加的哪个表中的数据
    #删缓存
# 2 跟内置信号绑定
pre_save.connect(callBack)

# 3 等待内置信号触发

8 flask-cache ?

https://flask.palletsprojects.com/en/3.0.x/patterns/caching/