drf-jwt

发布时间 2023-05-30 20:04:56作者: 橘子熊何妨

jwt原理

使用jwt认证和使用session认证的区别

session是将用户信息存放在数据库中,在客户端存一个随机的字符串,下次用户提交的时候将字符串与数据库中存在的进行比较
jwt是根据用户提交的信息,产生一个token,将token返回给客户端,下次用户提交的时候,将token进行解析

三段式

# 这是一个token
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.TJVA95OrM7E2cBab30RMHrHDcEfxjoYZgeFONFh7HgQ
# 1. header,jwt的头部
主要是声明类型,指明这里是jwt,声明加密的算法 通常直接使用 HMAC SHA256
展示公司信息
{
  'typ': 'JWT',
  'alg': 'HS256'
}
然后将头部进行base64编码,构成了第一部分.
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9

# 2. payload荷载,存放有效信息的地方
用户名,用户id,过期时间(exp)
{
  "exp": "1234567890",
  "name": "John Doe",
  "user_id":99
}
然后将其进行base64编码,得到JWT的第二部分。
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9

# 3. signature,JWT的第三部分是一个签证信息,这个签证信息由三部分组成:
header (base64后的)
payload (base64后的)
secret
由编码后的头部和编码后的第二部分,还有加的盐,进行字符串拼接,构成第三部分
然后三合一就变成了jwt全部
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.TJVA95OrM7E2cBab30RMHrHDcEfxjoYZgeFONFh7HgQ

jwt开发流程

分为两部分:
1. 签发token的过程,用户名携带密码,通过校验,生成token并返回给前端
2. token认证过程,在登陆认证时使用,就是认证类,在认证类中完成对token的认证操作,用户访问需要登陆后才能访问的接口就必须携带token串,取出token,验证其是否过期,被篡改,被伪造。都正常的情况下,就会根据荷载中的id查出用户信息放入request

drf-jwt快速使用

# djagno+drf框架中,使用jwt来做登录认证

# 使用第三方:
	-django-rest-framework-jwt:https://github.com/jpadilla/django-rest-framework-jwt
    -djangorestframework-simplejwt:https://github.com/jazzband/djangorestframework-simplejwt
    
# 我们可以自己封装 :https://gitee.com/liuqingzheng/rbac_manager/tree/master/libs/lqz_jwt


# 安装
	pip3 install djangorestframework-jwt
    
# 补充:
	密码就算明文一样,但是每次加密后都不一样,因为是动态加盐,产生一个随机字符串拼接到一起在进行加密,动态的盐也要保存起来
    身为程序员,有代码,有数据库,就没有登不进去的系统
# 至今解决不了:
	token被获取,模拟发送请求
    不能篡改
    不能伪造
# 快速使用
	-签发过程(快速签发),必须是auth的user表(人家帮你写好了)
    	-登录接口--》基于auth的user表签发的
    -认证过程
    	-登录认证,认证类(写好了)
        
# 总结:
	-签发:只需要在路由中配置
        from rest_framework_jwt.views import obtain_jwt_token
        urlpatterns = [
            path('login/', obtain_jwt_token), 
        ]
    -认证:视图类上加
    	class BookView(APIView):
            authentication_classes = [JSONWebTokenAuthentication] # 认证类,drf-jwt提供的
            permission_classes = [IsAuthenticated] # 权限类,drf提供的
    -访问的时候,要在请求头中携带,必须叫
    	Authorization:jwt token串

drf-jwt定制返回格式

# 登录签发token的接口,要返回code,msg,username,token等信息

# 视图
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_jwt.authentication import JSONWebTokenAuthentication


class BookView(APIView):
    authentication_classes = [JSONWebTokenAuthentication]
    permission_classes = [IsAuthenticated]

    def login(self, request):
        return Response()


def common_response(token, user=None, request=None):
    return {
        'code': 200,
        'msg': '登陆成功',
        'username': user.username,
        'token': token,
    }
    
# 配置文件
JWT_AUTH = {
    'JWT_RESPONSE_PAYLOAD_HANDLER':
    'app01.views.common_response',
}

# 路由
from rest_framework_jwt.views import obtain_jwt_token
urlpatterns = [
    path('login/', obtain_jwt_token),
]

drf-jwt自定义用户表签发

# 创建一个用户表
class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=64)
    phone = models.IntegerField()
    age = models.IntegerField()
    email = models.CharField(max_length=64)

# 登录接口
class Userview(ViewSet):

    def login(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        user = User.objects.filter(username=username).first()
        pwd = user.password
        res = pbkdf2_sha256.verify(password, pwd)
        if res:
            payload = jwt_payload_handler(user)
            token = jwt_encode_handler(payload)
            return Response(common_response(token, user, request))
        else:
            return Response(common_response(code=100, msg='用户名或者密码错误'))

# 注册用户
class UserDetailView(ViewSet):
    def reg(self, request):
        username = request.data.get('username')
        password = request.data.get('password')
        email = request.data.get('email')
        age = request.data.get('age')
        phone = request.data.get('phone')
        if not all((username, password, phone, email, age)):
            raise APIException('用户名,密码,手机号,邮箱,年龄,均不能为空')
        salt = uuid.uuid4()
        new_pwd = pbkdf2_sha256.hash(password, rounds=200000, salt=salt.bytes)
        User.objects.create(username=username, password=new_pwd, email=email, age=age, phone=phone)
        return Response(common_response(msg='注册成功'))
# make_password的源码
def make_password(password, salt=None, hasher='default'):
    if password is None:  # 不能是空
        return UNUSABLE_PASSWORD_PREFIX + get_random_string(UNUSABLE_PASSWORD_SUFFIX_LENGTH)
    if not isinstance(password, (bytes, str)):  # 必须是字符串或者二进制否则就报错
        raise TypeError(
            'Password must be a string or bytes, got %s.'
            % type(password).__qualname__
        )
    hasher = get_hasher(hasher)  # 获取加密方式
    salt = salt or hasher.salt()  # 获取盐
    return hasher.encode(password, salt)  # 加密后编码

drf-jwt自定义认证类

# drf的认证类定义方式,之前学过
# 在认证类中,自己写逻辑
class JWTAuthentication(BaseAuthentication):
    def authenticate(self, request):
        if request.method == 'GET':
            token = request.META.get('HTTP_TOKEN')
            try:
                payload = jwt_decode_handler(token)
                user_id = payload.get('user_id')
                user = User.objects.get(pk=user_id)

            except jwt.ExpiredSignature:
                raise AuthenticationFailed('token过期')
            except jwt.DecodeError:
                raise AuthenticationFailed('解码失败')
            except jwt.InvalidTokenError:
                raise AuthenticationFailed('token认证异常')
            except Exception:
                raise AuthenticationFailed('token认证异常')
            return user, token

drf-jwt的签发源码分析

# from rest_framework_jwt.views import obtain_jwt_token
# obtain_jwt_token就是ObtainJSONWebToken.as_view()---》视图类.as_view()

# 视图类
class ObtainJSONWebToken(JSONWebTokenAPIView):
    serializer_class = JSONWebTokenSerializer
    
# 父类:JSONWebTokenAPIView
class JSONWebTokenAPIView(APIView):
    # 局部禁用掉权限和认证
    permission_classes = ()
    authentication_classes = ()
    def post(self, request, *args, **kwargs):
        # serializer=JSONWebTokenSerializer(data=request.data)
        serializer = self.get_serializer(data=request.data)
		# 调用序列化列的is_valid---》字段自己的校验规则,局部钩子和全局钩子
        # 读JSONWebTokenSerializer的局部或全局钩子
        if serializer.is_valid(): # 全局钩子在校验用户,生成token
            # 从序列化类中取出user
            user = serializer.object.get('user') or request.user
            # 从序列化类中取出token
            token = serializer.object.get('token')
            # 咱么定制返回格式的时候,写的就是这个函数
            response_data = jwt_response_payload_handler(token, user, request)
            response = Response(response_data)
            return response

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
    
    
    
    
# JSONWebTokenSerializer的全局钩子
class JSONWebTokenSerializer(Serializer):
    def validate(self, attrs):
        # attrs前端传入的,校验过后的数据 {username:lqz,password:lqz12345}
        credentials = {
            'username': attrs.get('usernme'),
            'password': attrs.get('password')
        }

        if all(credentials.values()): # 校验credentials中字典的value值是否都不为空
            # user=authenticate(username=前端传入的,password=前端传入的)
            # auth模块的用户名密码认证函数,可以传入用户名密码,去auth的user表中校验用户是否存在
            # 等同于:User.object.filter(username=username,password=加密后的密码).first()
            user = authenticate(**credentials)
            if user:
                if not user.is_active:
                    msg = _('User account is disabled.')
                    raise serializers.ValidationError(msg)

                payload = jwt_payload_handler(user)

                return {
                    'token': jwt_encode_handler(payload),
                    'user': user
                }
            else:
                msg = _('Unable to log in with provided credentials.')
                raise serializers.ValidationError(msg)
        else:
            msg = _('Must include "{username_field}" and "password".')
            msg = msg.format(username_field=self.username_field)
            raise serializers.ValidationError(msg)

认证源码

# from rest_framework_jwt.authentication import JSONWebTokenAuthentication
# JSONWebTokenAuthentication
class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
    def get_jwt_value(self, request):
        # auth=['jwt','token串']
        auth = get_authorization_header(request).split()
        auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
        if not auth:
            if api_settings.JWT_AUTH_COOKIE:
                return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
            return None
        if smart_text(auth[0].lower()) != auth_header_prefix:
            return None
        if len(auth) == 1:
            msg = _('Invalid Authorization header. No credentials provided.')
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _('Invalid Authorization header. Credentials string '
                    'should not contain spaces.')
            raise exceptions.AuthenticationFailed(msg)
        return auth[1] # token串
    
# 父类中找:BaseJSONWebTokenAuthentication---》authenticate,找到了
class BaseJSONWebTokenAuthentication(BaseAuthentication):
    def authenticate(self, request):
        # 拿到前端传入的token,前端传入的样子是  jwt token串
        jwt_value = self.get_jwt_value(request)
        # 如果前端没传,返回None,request.user中就没有当前登录用户
        # 如果前端没有携带token,也能进入到视图类的方法中执行,控制不住登录用户
        # 所以咱们才加了个权限类,来做控制
        if jwt_value is None:
            return None
        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _('Signature has expired.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _('Error decoding signature.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()
        # 通过payload拿到当前登录用户
        user = self.authenticate_credentials(payload)
        return (user, jwt_value)

    
    
# 如果用户不携带token,也能认证通过
# 所以我们必须加个权限类来限制

class IsAuthenticated(BasePermission):
    def has_permission(self, request, view):
        return bool(request.user and request.user.is_authenticated)