drf - 基于自定义表编写认证类、jwt源码剖析

发布时间 2023-09-22 12:46:22作者: Way*yy

补充点

翻译函数;
	只要做了国际化处理,就会显示当前国家的语言
    from django.utils.translation import gettext_lazy as _
    msg = _('Signature has expired.') # _是函数的别名,这个函数是翻译函数,只要做了国际化处理,它就是中文

基于自定义表编写认证类

class AuthAuthentication(BaseAuthentication):
    def authenticate(self, request):
        # 取出用户传入的token---->token带在哪里取决于后端的选择
        token = request.META.get("HTTP_TOKEN")

        try:
            # 验证token--->Django_jwt提供了一个方法jwt_decode_handler----->通过传入的token,校验,校验成功返回payload,校验失败抛出异常
            payload = jwt_decode_handler(token)
            # 1、通过payload得到当前用户
            # 2、认证类,配好以后,只要有认证,就会走到这里---->每次走到这个地方都会查一次数据库,当遇到高并发可能就会出现问题
            优化:
                方法一:
                    # 1、自己根据payload数据,创建一个用户,有的参数没设计,就会使用默认值,跟真是的用户还是有区别
                方法二:
                	# 1、直接返回用户的id,后续的request.data就是用户的id,等到想用的时候再根据用户的id查询用户的数据
            user = User.objects.get(pk=payload.get("user_id"))
        except jwt.ExpiredSignature:
            msg = "签名过期"
             # msg = _('Signature has expired.') # _是个函数的别名,这个函数是翻译函数,只要做了国际化,它就是中文
            raise AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = "解码签名有误"
            raise AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise AuthenticationFailed()
        return user, token

登录的接口

class AuthUserView(GenericViewSet):
    serializer_class = UserSerializer

    @action(methods=["POST"], detail=False)
    def login(self, request, *args, **kwargs):
        """
        基于auth_user表的多方式登录
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        # 拿到前端传入的用户名和密码,得到一个序列化类对象
        user_serializer = self.get_serializer(data=request.data)
        if user_serializer.is_valid():
            token = user_serializer.context.get("token")
            username = user_serializer.context.get("username")
            return Response({"code": 100, "msg": "登录成功", "token": token, "username": username})
        else:
            return Response({"code": 101, "msg": "账号或密码输入错误"})

路由

from django.contrib import admin
from django.urls import path, include
from app_one.views import AuthUserView, UsersView, BookView
from rest_framework.routers import SimpleRouter

router = SimpleRouter()
# Django的auth_user表
router.register("user", AuthUserView, "user")

urlpatterns = [
    path('admin/', admin.site.urls),
]
urlpatterns += router.urls

视图类

class BookView(ViewSet, ListAPIView):
    authentication_classes = [AuthAuthentication]

    def list(self, request, *args, **kwargs):
        return Response("你看到我了吗")

django-jwt源码分析

签发源码分析

1、签发源码入口:
	from rest_framework_jwt.views import obtain_jwt_token
	obtain_jwt_token的本质就是:obtain_jwt_token = ObtainJSONWebToken.as_view()
    
2、ObtainJSONWebToken的视图类:
	class ObtainJSONWebToken(JSONWebTokenAPIView):
        # 序列化类
        serializer_class = JSONWebTokenSerializer
        
3、JSONWebTokenAPIView的视图类:
    class JSONWebTokenAPIView(APIView):
        # 在局部禁用了权限和认证类
        permission_classes = ()
        authentication_classes = ()
        
        def post(self, request, *args, **kwargs):
            # serializer = JSONWebTokenSerializer(data=request.data) # 本质就是这个配置了序列化类
            serializer = self.get_serializer(data=request.data)

            if serializer.is_valid():
                # 这个地方是获取了我当前登录的用户,如果从serializer.object取不出来就从request.user中取
                user = serializer.object.get('user') or request.user
                # 获取token
                token = serializer.object.get('token')
                # 这个地方是定制返回格式
                response_data = jwt_response_payload_handler(token, user, request)
                response = Response(response_data)
                return response
            # 如果报错就会返回异常信息以及异常状态码400
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
            
4、校验用户名和密码以及生成token,写在了序列化----->全局钩子中------>序列化类的入口:JSONWebTokenSerializer
    class JSONWebTokenSerializer(Serializer):
            def validate(self, attrs):
                credentials = {
                    "username" : attrs.get("username") # 本质就是这个
                    self.username_field: attrs.get(self.username_field), 
                    'password': attrs.get('password')
                }
                # 判断我credentials这个字典中的所有value值是否为True
                if all(credentials.values()):
                    # authenticate django的auth,通过用户名和明文密码校验用户是否存的函数:authenticate
                    # user = AuthUser.objects.filter(username=XXX,password=XXX).first()
                    user = authenticate(**credentials)
                    # 如果user存在
                    if user:
                        # 如果用户是被锁定状态
                        if not user.is_active:
                            # 主动抛出异常
                            msg = _('User account is disabled.')
                            raise serializers.ValidationError(msg)
					  # 将user传入生成一个payload
                        payload = jwt_payload_handler(user)
					  # 返回token以及用户
                        return {
                            'token': jwt_encode_handler(payload),
                            'user': user
                        }
                    else:
                        msg = _('Unable to log in with provided credentials.')
                        raise serializers.ValidationError(msg)
               else:
                    msg = _('Must include "{username_field}" and "password".')
                    msg = msg.format(username_field=self.username_field)
                    raise serializers.ValidationError(msg)
                    
总结:
	前端携带用户名以及密码到后端----->执行了后端的post方法------>在后端生成了一个序列化类的对象----->走全局钩子----->全局钩子中通过用户名密码获得了用户(如果获取不到就抛出异常)----->获取到签发token----->返给视图类------>在视图类中取出来----->给前端

认证类的源码分析

1、认证源码入口:
	from rest_framework_jwt.authentication import JSONWebTokenAuthentication
    
2、JSONWebTokenAuthentication源码:
    class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
        def get_jwt_value(self, request):
            # 首先,方法通过调用get_authorization_header函数获取到请求头中的Authorization字段的值,并使用空格进行分割,得到一个列表auth
            auth = get_authorization_header(request).split()
            # 然后,方法获取到JWT的前缀,即api_settings.JWT_AUTH_HEADER_PREFIX的值,并将其转换为小写。
            auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
            # 如果这个列表为空,表示请求头中没有Authorization字段
            if not auth:
                # 如果配置中定义了JWT_AUTH_COOKIE,则会尝试从请求的COOKIES中获取JWT的值
                if api_settings.JWT_AUTH_COOKIE:
                    return request.COOKIES.get(api_settings.JWT_AUTH_COOKIE)
                # 否则返回None
                return None
            # 如果auth列表不为空,但是第一个元素不等于JWT的前缀,返回None
            if smart_text(auth[0].lower()) != auth_header_prefix:
                return None
            # 如果列表的长度为1,表示没有提供有效的凭据,抛出AuthenticationFailed异常
            if len(auth) == 1:
                msg = _('Invalid Authorization header. No credentials provided.')
                raise exceptions.AuthenticationFailed(msg)
            # 如果auth列表长度大于2,表示凭据字符串包含了空格,抛出AuthenticationFailed异常
            elif len(auth) > 2:
                msg = _('Invalid Authorization header. Credentials string '
                        'should not contain spaces.')
                raise exceptions.AuthenticationFailed(msg)
            # 最后,方法返回auth列表中的第二个元素,即token
            return auth[1]
        
3、getget_authorization_header方法:
    def get_authorization_header(request):
        # 获取到请求头中以Authorization为键的值。
        auth = request.META.get('HTTP_AUTHORIZATION', b'')
        # 如果获取到的值是字符串类型,则将其转换为字节串类型(bytes)
        if isinstance(auth, str):
            auth = auth.encode(HTTP_HEADER_ENCODING)
        # 返回这个用户对象
        return auth
    
authenticate方法:
	def authenticate(self, request):
        # 拿出前端传入的token,可能前端没传,就是None
        jwt_value = self.get_jwt_value(request)
        if jwt_value is None: # None就不管了,认证类继续往后走,相当于没有认证,于是咱们才需要权限类
            return None

        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _('Signature has expired.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _('Error decoding signature.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()

        user = self.authenticate_credentials(payload)

        return (user, jwt_value)

权限介绍

# 所有项目都会有权限控制
# https://www.cnblogs.com/liuqingzheng/articles/16972464.html


#ACL(Access Control List,访问控制列表)---》针对互联网用户,多半是这个
	将用户与权限对接(多对多)
       张三   [发视频,点赞,评论]
       李四   [看视频]
    
    
    
# RBAC(Role-Based Access Control,基于角色的访问控制)--》公司内部项目
	将用户与角色对接,然后角色与对象的权限对接。
    django的admin+auth就是使用了这套认证机制
    
# ABAC(Attribute-Based Access Control,基于属性的访问控制)
    ABAC(Attribute-Based Access Control,基于属性的访问控制),又称为PBAC(Policy-Based Access Control,基于策略的访问控制),CBAC(Claims-Based Access Control,基于声明的访问控制)。

    传统的ACL、RBAC的架构是{subject,action,object},
    而ABAC的架构是{subject,action,object,contextual}且为他们添加了parameter(参数)。

    subject属性:比如用户的年龄、部门、角色、威望、积分等主题属性。

    action属性:比如查看、读取、编辑、删除等行为属性。

    object属性:比如银行账户、文章、评论等对象或资源属性。

    contextual属性:比如时段、IP位置、天气等环境属性
    
    
    
# python写公司内部项目比较多,使用rbac控制居多
	RBAC  是基于角色的访问控制(Role-Based Access Control )在 RBAC  中,权限与角色相关联,用户通过成为适当角色的成员而得到这些角色的权限。这就极大地简化了权限的管理。这样管理都是层级相互依赖的,权限赋予给角色,而把角色又赋予用户,这样的权限设计很清楚,管理起来很方便。
    
    
    
# rbac如何设计
	-用户表--->一堆用户:张三,李四
    -角色表(group组)--->用户的角色:后勤部,开发部,总裁办。。。
    -权限表----->放了一堆权限:发电脑,提交代码,删除代码,发工资
    -用户和角色多对多:一个用户属于多个角色,一个角色属于多个用户
    -角色 和权限多对多:一个角色有多个权限,一个权限属于多个角色
    --------rbac----通过5张表可以实现
    -django为了更细粒度划分---》多了一张表,用户和权限多对多

simpleui的使用

# 公司内部,做公司内的项目需要使用这套权限控制
## 方案一:使用django-admin写
# 有的公司,不怎么写前端,直接使用django的admin,快速写出一套具有权限管理的系统
# django admin的界面不好看:第三方美化-->simpleui

## 方案二:自己写,前端使用vue,后端使用django,做公司内部的项目
	-第三方开源的权限控制 项目
    	-python界:django-vue-admin   7期学长写的
        -java界:若依
        -go界:gin-vue-admin

        
#  django admin的美化: simpleui

作业

# 1 基于自定义表认证类
#2 分析djagno-jwt源码
#3 自己演示djagno的rbac权限控制
# 4 simpleui研究  侧边栏定制。。。
----------------------
#  尝试自己写一个jwt模块,你用