Flask_实现token鉴权

发布时间 2023-03-26 13:02:16作者: 不自在

1、安装依赖

pip install flask
pip install pycryptodome

2、实现代码

import random
import string
import time
import base64

from functools import wraps

from flask import Flask, jsonify, session, request, make_response
from cryptography.hazmat.primitives.ciphers.aead import AESGCM


SECRET_KEY = "DnKRYZbvVzdhPlF01rtcxmi5Cj36AbCd"


app = Flask(__name__)
app.config["SECRET_KEY"] = SECRET_KEY


# ========================= 数据加密解密方法 ==============================================
def encrypt_aes_gcm(key, data):
    """
    AES-GCM加密
    :param key: 密钥。16, 24 or 32字符长度的字符串
    :param data: 待加密字符串
    """
    key = key.encode('utf-8')
    if isinstance(data, dict):
        data = str(data)
    data = data.encode('utf-8')
    # 生成长度为16位的随机值
    nonce = ''.join(random.sample(string.ascii_letters + string.digits, 16))
    nonce = nonce.encode("utf-8")

    # 生成加密器
    cipher = AESGCM(key)
    # 加密数据
    crypt_bytes = cipher.encrypt(nonce, data, associated_data=None)
    return base64.b64encode(nonce + crypt_bytes).decode()


def decrypt_aes_gcm(key, cipher_data):
    """
    AES-GCM解密
    :param key: 密钥
    :param cipher_data: encrypt_aes_gcm 方法返回的数据
    :return:
    """
    key = key.encode('utf-8')

    # 进行base64解码
    debase64_cipher_data = base64.b64decode(cipher_data)

    # 分割数据
    nonce = debase64_cipher_data[:16]
    cipher_data = debase64_cipher_data[16:]
    cipher = AESGCM(key)

    # 解密数据
    plaintext = cipher.decrypt(nonce, cipher_data, associated_data=None)
    return plaintext.decode()


# ========================= 鉴权部分 ==============================================
def generate_token(user_info, expiration=3600):
    """ 生成token生成token的密钥
    :param user_info: 生成token的信息
    :param expiration: token有效时间,单位秒
    """
    expiration = int(time.time() + expiration)
    data = {'user_info': user_info, "expiration": expiration}
    return encrypt_aes_gcm(SECRET_KEY, data)


def decrypt_token(token):
    """ 解析token """
    data = decrypt_aes_gcm(SECRET_KEY, token)
    return eval(data)


def login_required(func):
    """ 鉴权装饰器 """
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 获取session存储的token
        s_token = session.get("token")

        # 获取请求中带的token
        r_token = request.headers.get("token")

        # 验证token是否匹配
        if s_token != r_token:
            return jsonify(code="4000", msg="鉴权失败")

        # 验证token是否失效
        data = decrypt_token(r_token)
        expiration = data.get("expiration")
        if expiration < int(time.time()):
            return jsonify(code="4010", msg="授权失效")
        return func(*args, **kwargs)
    return wrapper


# ========================= api接口 ==============================================
@app.post('/login')
def login():
    req = eval(request.data)
    username = req.get('username')
    password = req.get('password')

    # 验证账密
    if username != "admin" and password != "admin":
        return jsonify(code="1000", msg="用户名或密码错误")

    # 账密验证通过,获取用户信息
    user_info = {"user_id": "12345"}
    token = generate_token(user_info)

    # 存储token
    session["token"] = token

    # 定义响应信息
    resp = make_response(jsonify(code="0", data={'token': token}, msg="登录成功"))
    resp.headers["token"] = token
    return resp


@app.post('/index')
@login_required
def index():
    return jsonify(code="0", data="index", msg="请求成功")


if __name__ == '__main__':
    app.run()

流程图

 

3、测试