1、安装依赖
pip install flask
pip install pycryptodome
2、实现代码
import random import string import time import base64 from functools import wraps from flask import Flask, jsonify, session, request, make_response from cryptography.hazmat.primitives.ciphers.aead import AESGCM SECRET_KEY = "DnKRYZbvVzdhPlF01rtcxmi5Cj36AbCd" app = Flask(__name__) app.config["SECRET_KEY"] = SECRET_KEY # ========================= 数据加密解密方法 ============================================== def encrypt_aes_gcm(key, data): """ AES-GCM加密 :param key: 密钥。16, 24 or 32字符长度的字符串 :param data: 待加密字符串 """ key = key.encode('utf-8') if isinstance(data, dict): data = str(data) data = data.encode('utf-8') # 生成长度为16位的随机值 nonce = ''.join(random.sample(string.ascii_letters + string.digits, 16)) nonce = nonce.encode("utf-8") # 生成加密器 cipher = AESGCM(key) # 加密数据 crypt_bytes = cipher.encrypt(nonce, data, associated_data=None) return base64.b64encode(nonce + crypt_bytes).decode() def decrypt_aes_gcm(key, cipher_data): """ AES-GCM解密 :param key: 密钥 :param cipher_data: encrypt_aes_gcm 方法返回的数据 :return: """ key = key.encode('utf-8') # 进行base64解码 debase64_cipher_data = base64.b64decode(cipher_data) # 分割数据 nonce = debase64_cipher_data[:16] cipher_data = debase64_cipher_data[16:] cipher = AESGCM(key) # 解密数据 plaintext = cipher.decrypt(nonce, cipher_data, associated_data=None) return plaintext.decode() # ========================= 鉴权部分 ============================================== def generate_token(user_info, expiration=3600): """ 生成token生成token的密钥 :param user_info: 生成token的信息 :param expiration: token有效时间,单位秒 """ expiration = int(time.time() + expiration) data = {'user_info': user_info, "expiration": expiration} return encrypt_aes_gcm(SECRET_KEY, data) def decrypt_token(token): """ 解析token """ data = decrypt_aes_gcm(SECRET_KEY, token) return eval(data) def login_required(func): """ 鉴权装饰器 """ @wraps(func) def wrapper(*args, **kwargs): # 获取session存储的token s_token = session.get("token") # 获取请求中带的token r_token = request.headers.get("token") # 验证token是否匹配 if s_token != r_token: return jsonify(code="4000", msg="鉴权失败") # 验证token是否失效 data = decrypt_token(r_token) expiration = data.get("expiration") if expiration < int(time.time()): return jsonify(code="4010", msg="授权失效") return func(*args, **kwargs) return wrapper # ========================= api接口 ============================================== @app.post('/login') def login(): req = eval(request.data) username = req.get('username') password = req.get('password') # 验证账密 if username != "admin" and password != "admin": return jsonify(code="1000", msg="用户名或密码错误") # 账密验证通过,获取用户信息 user_info = {"user_id": "12345"} token = generate_token(user_info) # 存储token session["token"] = token # 定义响应信息 resp = make_response(jsonify(code="0", data={'token': token}, msg="登录成功")) resp.headers["token"] = token return resp @app.post('/index') @login_required def index(): return jsonify(code="0", data="index", msg="请求成功") if __name__ == '__main__': app.run()
流程图
3、测试