drf(JWT认证)

发布时间 2023-10-10 22:32:59作者: coder雪山

一. jwt实现过程

1. 构建jwt过程

第一: 用户提交用户名和密码给服务端,如果登录成功,使用jwt创建一个token,并给用户返回

eyJ0eXAiOiJqd3QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InpjYyIsImV4cCI6MTU5NDczODg5MX0.OCG4mUhs_yXIkxtxvG9MWJWjpbvnSGDcqMVtpsn_0mo

第二步: 构建三段字符串之间的关系

# 第一段字符串 headers内部包含了算法 和 token类型。
       流程: 先将python类型对象装换成json格式字符串, 然后做base64加密
    headers = {
        'typ': 'jwt',
        'alg': 'HS256',
    }    

    
# 第二段字符串payload,自定义的值
    流程: 先将python类型对象装换成json格式字符串,然后做base64加密
    payload = {
        'user_id': user.pk,
        'username': username,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=300),  # 超时时间
    }
    
# 第三段字符串 
    第一步:把1,2部分base64加密过后的结果进行拼接加密
    第二步:对前2部分的加密结果进行hs256加密 + 加盐
    第三步:对hs256加密后的密文在进行base64url加密再拼接到前1, 2部分base64格式的末尾作为sign.

第三步: 以后用户访问时,需要携带token,后端需要对token校验

2. 校验jwt过程

第一步: 获取token

第二步: 对token进行切割, 获取第二段内容进行base64解密,获取payload信息, 检查超时时间是否超时

第三步:由于第三部分的字符串不能反解,把第一和第二段在进行hs256加密

1. 把1,2部分base64的密文拼接加密
2. 对前2部分加密进行hs256加密+加盐得到密文
3. 再将密文机进行base64加密, 与前两段的base64d格式的密文进行对比, 如果相等,表示token没有修改通过.

二. drf-jwt安装

官网: http://getblimp.github.io/django-rest-framework-jwt/

安装: pip install djangorestframework-jwt

三. 使用内置jwt认证+签发token

1. 快速使用

urls.py

# 路由中配置
# 提示: 
'''
obtain_jwt_token本质是由ObtainJSONWebToken类调用as_view类方法实例化出来的, 其实路由中这样写也可以:
    path('login/', ObtainJSONWebToken.as_view()),
'''

from rest_framework_jwt.views import ObtainJSONWebToken, obtain_jwt_token, JSONWebTokenAPIView, VerifyJSONWebToken

urlpatterns = [
    # path('login/', ObtainJSONWebToken.as_view()),
    path('login/', obtain_jwt_token),
]

views.py

from rest_framework_jwt.authentication import JSONWebTokenAuthentication


# Create your views here.
# 快速实现jwt
class BookAPIView(APIView):
    authentication_classes = [JSONWebTokenAuthentication, ]

    def get(self, request):
        return Response('OK')

解析: 为什么路由中配置了obtain_jwt_token用户认证, 签发token等等都不需要写了?

# 帮我们写了视图认证实现接受用户请求及基于请求响应:
    看继承关系: obtain_jwt_token = ObtainJSONWebToken.as_view() -> ObtainJSONWebToken -> JSONWebTokenAPIView
    JSONWebTokenAPIView就是我们的视图类. 它里面写了post方法, 处理我们的认证请求.
    
# 帮我们写了序列化器实现了token的签发:
    class ObtainJSONWebToken(JSONWebTokenAPIView):
        # JSONWebTokenSerializer内部就在序列换器里面使用了validate钩子, 实现了token的签发
        serializer_class = JSONWebTokenSerializer

使用内置提供的认证

头部访问格式: 使用内置的如果没有修改配置文件中配置的前缀, 那么jwt前缀必须要加, 如果不加前缀认证就返回None, 认证就失效了. 大小写都行.

Authorizationjwt eyJ0eXAiOiJqd3QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InlhbmciLCJleHAiOjE1OTUwODY3NDZ9.aaehvGOl3AMI5gfU2Z9L8GH015pWIitOCXLgBJ5zl8E
Authorization JWT eyJ0eXAiOiJqd3QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6InlhbmciLCJleHAiOjE1OTUwODY3NDZ9.aaehvGOl3AMI5gfU2Z9L8GH015pWIitOCXLgBJ5zl8E

拓展: 认证前缀可以修改

from rest_framework_jwt import settings
'JWT_AUTH_HEADER_PREFIX': 'JWT',
rom rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework_jwt.authentication import JSONWebTokenAuthentication


# Create your views here.
# 使用jwt提供的认证类,局部使用

# 可以通过认证类JSONWebTokenAuthentication和权限类IsAuthenticated,来控制用户登录以后才能访问某些接口
# 如果用户不登录就可以访问,只需要把权限类IsAuthenticated去掉就可以
class Order(APIView):
    authentication_classes = [JSONWebTokenAuthentication, ]
    permission_classes = [IsAuthenticated, ]

    def get(self, request, *args, **kwargs):
        return Response('这是订单信息')

3 使用内置认证控制登录成功时response返回的格式

utils.py

from rest_framework_jwt.utils import jwt_response_payload_handler


def custom_jwt_response_payload_handler(token, user=None, request=None):
    # 返回什么, 认证成功时就返回什么格式
    return {
        'status': 1000,
        'messages': '登录成功',
        'token': token,
        'username': user.username
    }

settings.py

# 第二步: settings.py文件中配置成自己的路径即可
JWT_AUTH = {
    # utils.jwt_response_payload_handler.custom_jwt_response_payload_handler
    'JWT_RESPONSE_PAYLOAD_HANDLER':
        'utils.jwt_response_payload_handler.custom_jwt_response_payload_handler',
}

四. 自定义jwt认证+签发

1. 自定义jwt认证

1) 继承BaseAuthentication实现

utils.py

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework_jwt.utils import jwt_decode_handler  # 用上面的也可以
import jwt
from app01 import models


class MyJwtAuthentication(BaseAuthentication):
    def authenticate(self, request):
        jwt_value = request.META.get('HTTP_AUTHORIZATION')
        if jwt_value:
            try:
                payload = jwt_decode_handler(jwt_value)
            except jwt.ExpiredSignature:
                raise AuthenticationFailed('签名过期!')
            except jwt.DecodeError:
                raise AuthenticationFailed("签名解码错误!")
            except jwt.InvalidTokenError:
                raise AuthenticationFailed('token无效!')
            except Exception as e:
                raise AuthenticationFailed(str(e))
            print(payload)
            # {'user_id': 3, 'username': 'zd', 'exp': 1696772425, 'email': ''}
            # 方式一: 缺点, 查数据库耗费时间
            # user_obj = User.objects.get(pk=payload.get('user_id'))
            # print('user_obj.phone:', user_obj.phone)  # 17621839222

            # 方式二: 缺点, 没有传递的数据就获取不到
            user_obj = models.UserInfo(id=payload.get('user_id'), username=payload.get('username'))
            print('user_obj.phone:', [user_obj.phone])
            return user_obj, jwt_value  # ['']
        # 没有携带值,直接抛异常
        raise AuthenticationFailed('没有携带认证信息')

views.py

from app02.utils import MyJwtAuthentication


class Goods(APIView):
    authentication_classes = [MyJwtAuthentication, ]

    def get(self, request, *args, **kwargs):
        return Response('商品信息')

2) 继承BaseJSONWebTokenAuthentication + 手动get获取jwt_value 或者 自动获取jwt_value实现

utls.py

import jwt
from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication

from rest_framework_jwt.authentication import jwt_decode_handler
from rest_framework_jwt.utils import jwt_decode_handler

from rest_framework.exceptions import AuthenticationFailed
from rest_framework.exceptions import APIException

from rest_framework.authentication import get_authorization_header


class MyJwtAuthentication(BaseJSONWebTokenAuthentication):
    def authenticate(self, request):
        # 手动获取jwt_value
        # jwt_value = request.META.get('HTTP_AUTHORIZATION')
        # 自动获取jwt_value
        jwt_value = get_authorization_header(request)
        if jwt_value:
            try:
                payload = jwt_decode_handler(jwt_value)
            except jwt.ExpiredSignature:
                raise AuthenticationFailed('签名过期!')
            except jwt.DecodeError:
                raise AuthenticationFailed("签名解码错误!")
            except jwt.InvalidTokenError:
                raise AuthenticationFailed('token无效!')
            except Exception as e:
                raise AuthenticationFailed(str(e))
            user_obj = self.authenticate_credentials(payload)
            print('user_obj.phone:', [user_obj.phone])
            return user_obj, jwt_value  # ['']
        # 没有携带值,直接抛异常
        raise AuthenticationFailed('没有携带认证信息')

views.py

from app02.utils import MyJwtAuthentication


class Goods(APIView):
    authentication_classes = [MyJwtAuthentication, ]

    def get(self, request, *args, **kwargs):
        return Response('商品信息')

2. 自定义签发token

# 使用用户名,手机号,邮箱,都可以登录
# 前端需要传的数据格式
{
"username":"lq/13232333333/djd@163.com",
"password":'123'
}

1)多方式登录,逻辑写在序列化类中

views.py

from rest_framework.viewsets import ViewSet
from app02.ser import LoginModelSerializer


# class Login1APIView(ViewSetMixin,APIView)
class Login1APIView(ViewSet):  # 跟上面完全一样
    """
    继承ViewSet意义:
        1. 修改视图类中方法, 使用login明确提意
        2. 继承了APIView, 具有较高的可控性
    """

    def login(self, request, *args, **kwargs):
        # 1 需要 有个序列化的类
        login_ser = LoginModelSerializer(data=request.data)
        # 2 生成序列化类对象
        # 3 调用序列化对象的is_validate
        login_ser.is_valid(raise_exception=True)
        token = login_ser.context.get('token')
        username = login_ser.context.get('username')
        return Response({'status': 1000, 'msg': '登录成功', 'token': token, 'username': username})

ser.py

from rest_framework import serializers
import re
from app01 import models
from rest_framework.exceptions import ValidationError
from rest_framework_jwt.utils import jwt_encode_handler, jwt_payload_handler


class LoginModelSerializer(serializers.ModelSerializer):
    # 需要覆盖低下的username字段,数据中是unique,post,认为你保存数据,自己校验不过,不然走不到validate
    username = serializers.CharField()
    class Meta:
        model = models.UserInfo
        fields = ['username', 'password']

    def validate(self, attrs):
        # 在这里写逻辑
        username = attrs.get('username')  # 用户名有三种方式
        password = attrs.get('password')
        # 通过判断,username数据不同,查询字段不一样
        # 正则匹配,如果是手机号
        if re.match(r'^1[3-9][0-9]{9}$', username):
            user_obj = models.UserInfo.objects.filter(phone=username).first()
        elif re.match('^.*?@.*?\.com$', username):
            user_obj = models.UserInfo.objects.filter(email=username).first()
        else:
            user_obj = models.UserInfo.objects.filter(username=username).first()
        if user_obj:  # 用户存在
            # 校验密码,因为是密文,要用check_password
            if user_obj.check_password(password):
                # 签发token
                payload = jwt_payload_handler(user_obj)
                token = jwt_encode_handler(payload)
                self.context['token'] = token
                self.context['username'] = user_obj.username
                return attrs
            else:
                raise ValidationError('密码错误')
        else:
            raise ValidationError('用户不存在')


'''
payload = jwt_payload_handler(user_obj) # 把user传入,得到payload
token = jwt_encode_handler(payload) # 把payload传入,得到token
'''

urls.py

urlpatterns = [
    path('login1/', views.Login1APIView.as_view({'post': 'login'})),
]

2) 多方式登录,逻辑写在视图类中

views.py

# 视图代码
import re
from rest_framework.viewsets import ViewSet
from rest_framework_jwt.utils import jwt_payload_handler
from rest_framework_jwt.utils import jwt_encode_handler
from rest_framework.exceptions import ValidationError

from app01.models import User


class LoginAPIView(ViewSet):
    """
    继承ViewSet意义:
        1. 修改视图类中方法, 使用login明确提意
        2. 继承了APIView, 具有较高的可控性
    """
    def login(self, request, *args, **kwargs):
        username = request.data.get('username')
        password = request.data.get('password')

        # username=egon/111@qq.com/17621839222
        if re.search(r'^1[3-9][0-9]{9}$', username):
            user = User.objects.filter(phone=username).first()
        elif re.search(r'^.*?@.*?qq\.com$', username):
            user = User.objects.filter(email=username).first()
        else:
            user = User.objects.filter(username=username).first()

        if user:
            # 校验密码,因为是密文,要用check_password
            is_login = user.check_password(raw_password=password)
            if is_login:
                # 签发token
                payload = jwt_payload_handler(user)
                token = jwt_encode_handler(payload)
                return Response({'status': 1000, 'token': token, 'results': {'username': user.username, 'email': user.email}})
            raise ('用户密码错误!')
        raise ValidationError("用户名错误!")

五. jwt的配置参数: 过期时间配置

import datetime

JWT_AUTH = {
    # 过期时间七天
    'JWT_EXPIRATION_DELTA': datetime.timedelta(days=7),
}

六. base64编码与解码

# md5 固定长度,不可反解
# base63 变长,可反解

# 编码(字符串,json格式字符串)
import base64
import json

dic = {'name': 'lq', 'age': 18, 'sex': ''}
dic_str = json.dumps(dic)

res = base64.b64encode(dic_str.encode('utf-8'))
print(res)

# 解码
res1 = base64.b64decode(res)
res2 = json.loads(res1)
print(res1, res2)