Django RestFramework、Celery及Channels

发布时间 2023-10-10 11:37:20作者: 韩志超

Django REST Framework

什么是RESTful API

RESTful API是一种基于HTTP协议的接口设计风格,它使用统一的接口和资源的概念来定义和操作网络应用程序的功能和数据。RESTful API使用HTTP动词(GET、POST、PUT、DELETE等)来表示操作类型,并使用URL来标识资源。

传统风格的HTTP接口

常用授权方式:Cookie / Session

  • GET /get_book_list
  • GET /get_book_detail?id=1
  • POST /create_book
  • POST /update_book?id=1
  • POST /delete_book?id=1

RESTful风格的HTTP接口

常用授权方式:Token

  • /api/v1/books/ GET 列表 POST 创建
  • /api/v1/books/1 GET详情 PUT修改 DELETE 删除

image

RESTful API基础特点

  • 基于资源:接口端点使用名词(复数)
  • 通一的接口:同一个接口路径支持多种请求方法,GET获取资源信息,POST新建或更新资源,PUT更新资源,DELETE删除资源。
  • 使用HTTP状态码表示操作结果:如200成功,201创建成功,204删除成功等。
  • 接口专有路径及分版本管理:接口路径统一放在 /api/ 下,接口分版本管理 /api/v1/... /api/v2/..
  • 无状态:身份认证推荐使用OAuth2.0,或Basic Auth,token等,避免使用Cookie和Session
  • 数据格式:尽量使用JSON, 避免使用XML

Django Rest Framework介绍

Django REST Framework(DRF)是一个用于构建强大的Web API的开源框架,它是基于Django的Python Web框架的扩展。DRF提供了一组工具和库,使得构建和发布Web API变得简单和高效。

  • 快速开发:DRF提供了许多内置的功能和工具,使得快速构建和开发Web API变得更加容易。它提供了一套强大的类视图(Class-based Views)和序列化器(Serializers),可以快速创建API视图和处理数据序列化。
  • 强大的序列化:DRF的序列化器提供了灵活和强大的数据序列化和反序列化功能。它可以将复杂的数据模型转换为JSON或其他格式,并处理数据验证和转换。
  • 路由和URL配置:DRF提供了简单而灵活的路由器(Router)来自动生成URL配置。它可以自动映射URL到视图集(ViewSet)和动作(Action),简化了URL配置的过程。
  • 认证和权限:DRF支持多种认证和权限控制机制,包括基于Token的认证、基于角色的权限控制等。它可以轻松地集成到现有的身份验证和权限系统中。
  • 分页和限流:DRF提供了分页器(Paginator)和限流器(Throttling)来处理数据分页和请求限制。它可以轻松地实现数据分页和限制API请求的频率。
  • 自动化文档生成:DRF提供了自动生成API文档的功能,包括交互式API浏览器(Browsable API)和自动生成的API文档。这使得API的文档和测试变得更加简单和方便。
  • 第三方库支持:DRF与许多其他常用的Python库和工具集成良好,如Django ORM、JWT认证、OAuth2认证等。这使得开发人员可以根据需求选择适合的工具和库来构建API。

从FBV到CBV到通用视图View视图

Django提供了两种编写视图的方式

  • 基于函数的视图(Function Base View, FBV):优点是比较直接,容易读者理解, 缺点是不便于继承和重用。
  • 基于类的视图(Class Based View, CBV):基于类的视图以类定义,使用类视图后可以将视图对应的不同请求方式以类中的不同方法来区别定义,相对于函数视图逻辑更清晰,代码也有更高的复用性。

基于函数的视图(接口)

def project_list(request):
    if request.method == 'GET':  # 获取项目列表
        projects = Project.objects.all()
        return render(request, 'project_list.html', 
                      {'projects': projects})
    if request.method == 'POST':  # 创建项目
        name = request.POST.get('name') 
        description = request.POST.get('description') 
        Project(name=name, description=description).save()
        return redirect('.')

# path('project_list/', views.project_list),

基于类的视图(接口)

from django.views import View
class ProjectList(View):
    def get(self, request):
        projects = Project.objects.all()
        return render(request, 'project_list.html',
                      {'projects': projects})
    def post(self, request):
        name = request.POST.get('name')
        description = request.POST.get('description')
        Project(name=name, description=description).save()
        return redirect('.')

# path('project_list/', views.ProjectList.as_view()),

基于类的通用视图

Django提供了很多通用的基于类的视图,来帮我们简化视图的编写:

  • 展示对象列表(比如所有用户,所有文章)- ListView
  • 展示某个对象的详细信息(比如用户资料,比如文章详情) - DetailView
  • 通过表单创建某个对象(比如创建用户,新建文章)- CreateView
  • 通过表单更新某个对象信息(比如修改密码,修改文字内容)- UpdateView
  • 用户填写表单后转到某个完成页面 - FormView
  • 删除某个对象 - DeleteView
from django.views.generic import ListView, CreateView

class ProjectList(ListView):
    queryset = Project.objects.all()
    template_name = 'project_list.html'
    context_object_name = 'projects'

class ProjectCreate(CreateView):
    form_class = CreateProjectForm   # 必须是ModelForm
    template_name = 'project_list.html'
    success_url = '.'

使用Django Rest Framework

  1. 安装
pip install djangorestframework
  1. 注册应用
INSTALLED_APPS = [ ... 'rest_framework', ]

传统Django接口写法

@require_http_methods(['GET', 'POST'])
def project_list(request):
    if request.method == 'GET':
        projects = Project.objects.all()
        data = [
            {'name': project.name, 'description': project.description}
            for project in projects
        ]
        return JsonResponse(data=data, safe=False)
    if request.method == 'POST':
        name = request.POST.get('name')
        description = request.POST.get('description')
        project = Project(name=name, description=description)
        project.save()
        data = {'name': project.name, 'description': project.description}
        return JsonResponse(data=data)

DRF接口写法

from rest_framework import serializers, status
from rest_framework.decorators import api_view
from rest_framework.response import Response

class ProjectSerializer(serializers.Serializer):
   name = serializers.CharField(max_length=128, required=True, allow_blank=False)
   description = serializers.CharField(required=False, allow_blank=True)

@api_view(['GET', 'POST'])
def project_list(request):
    if request.method == 'GET':
        projects = Project.objects.all()
        serializer = ProjectSerializer(projects, many=True)
        return Response(serializer.data)
    if request.method == 'POST':
        serializer = ProjectSerializer(data=request.data)
        if serializer.is_valid():
            serializer.save()  # 
            return Response(serializer.data, status=status.HTTP_201_CREATED)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

# path('projects', views.project_list)

image

详情接口(GET获取 / PUT 修改 / DELETE 删除)示例

@api_view(['GET', 'PUT', 'DELETE'])
def project_detail(request, pk):
    try:
        project = Project.objects.get(pk=pk)
    except Project.DoesNotExist:
        return Response(status=status.HTTP_404_NOT_FOUND)

    if request.method == 'GET':
        serializer = ProjectSerializer(project)
        return Response(serializer.data)

    elif request.method == 'PUT':
        serializer = ProjectSerializer(project, data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    elif request.method == 'DELETE':
        project.delete()
        return Response(status=status.HTTP_204_NO_CONTENT)

# path('projects/<pk>', views.project_detail)

image

使用通用视图
详情接口(GET获取 / PUT 修改 / DELETE 删除)示例

from rest_framework import generics
from . import models

class ProjectList(generics.ListCreateAPIView):
    queryset = models.Project.objects.all()
    serializer_class = ProjectSerializer

class ProjectDetail(generics.RetrieveUpdateDestroyAPIView):
    queryset = models.Project.objects.all()
    serializer_class = ProjectSerializer

# path('projects', views.ProjectList.as_view())
# path('projects/<pk>', views.ProjectDetail.as_view())

序列化及反序列化

在Django REST Framework(DRF)中,序列化和反序列化是非常重要的概念,用于将数据模型转换为可传输的格式(如JSON)以及将传输的数据转换回数据模型。

from rest_framework import serializers

class ProjectSerializer(serializers.Serializer):
   name = serializers.CharField(max_length=128, required=True, allow_blank=False)
   description = serializers.CharField(required=False, allow_blank=True)

基于模型的序列化

class ProjectSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.Project
        fields = ['name', 'description']  # 支持 '__all__' 代表所有字段

序列化器使用方法

  1. 实例化序列化器
  • 项目列表:serializer = ProjectSerializer(projects, many=True)
  • 项目详情:serializer = ProjectSerializer(project)
  • 创建项目:serializer = ProjectSerializer(data=request.data)
  • 修改项目:serializer = ProjectSerializer(project, data=request.data)
  1. 使用序列化器对象:
  • 返回转换(对象属性->JSON)后的数据:return Response(serializer.data)
  • 验证请求参数(创建/修改时):if serializer.is_valid()
  • 保存数据(创建/修改时):serializer.save()# 必须是ModelSerializer
  • 返回错误信息 serializer.errors

常用序列化字段参数

参数 说明
write_only 只写
read_only 只读
required 必填字段
max_length 最大长度(CharField)
default 默认值
allow_null 默认为False
allow_blank
source 对应模型源字段
validateors 验证器
error_messages 验证失败消息
lable 字段标签
help_text 帮助文本
iniital 初始设置
style 字段样式

参考:Django REST framework fields

常用序列化字段

布尔型

  • serializers.BooleanField

字符串类型

  • serializers.CharField
  • serializers.EmailField
  • serializers.RegexField
  • serializers.SlugField
  • serializers.URLField
  • serializers.UUIDField
  • serializers.FilePathField
  • serializers.IPAddressField

数字类型

  • serializers.IntergerField
  • serializers.FloatField
  • serializers.DecimalField

日期时间类型

  • serializers.DateTimeField
  • serializers.DateField
  • serializers.TimeField
  • serializers.DurationField

选项类型

  • serializers.ChoiceField
  • serializers.MultipleChoiceField
  • ...

参考:Django REST framework fields

序列化自定义字段

在序列化类中,可以使用SerializerMethodField,通过一个函数来自定义字段的返回值,这个函数名必须为get_字段名,同时自定义字段是只读的。

from django.contrib.auth.models import User
from django.utils.timezone import now
from rest_framework import serializers

class UserSerializer(serializers.ModelSerializer):
    days_since_joined = serializers.SerializerMethodField()

    class Meta:
        model = User
        fields = '__all__'

    def get_days_since_joined(self, obj):
        return (now() - obj.date_joined).days

列表(后端)分页配置

REST_FRAMEWORK = { 
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination', 'PAGE_SIZE': 10 
}

REST_FRAMEWORK = { 
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.LimitOffsetPagination', 'PAGE_SIZE': 10 
}

参考:Django Rest framework pagation

列表过滤、搜索及排序

安装django-filters, 注册及配置

pip install django-filter
INSTALLED_APPS = [ 
    ... 
    'django_filters', 
    ... 
]

REST_FRAMEWORK = { 
    'DEFAULT_FILTER_BACKENDS': ['django_filters.rest_framework.DjangoFilterBackend'] 
}

过滤器声明方法:

from rest_framework import generics
from . import models
from django_filters.rest_framework import DjangoFilterBackend

class ProjectList(generics.ListCreateAPIView):
    queryset = models.Project.objects.all()
    serializer_class = ProjectSerializer
    filter_backends = [DjangoFilterBackend]
    filterset_fields = ['id', 'name']  # 支持的过滤字段
    search_fields = ['id', 'name', 'description']  # 支持的搜索字段
    ordering_fields = ['id', 'name']  # 支持的排序字段

过滤器使用方法:

参考:Django Rest framework filtering

接口认证-基础认证

  1. 设置方法
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.BasicAuthentication', # 基础授权
        'rest_framework.authentication.SessionAuthentication',  # Cookie/Session授权
    ]
}
  1. 启用授权接口
urlpatterns += [
    path('api-auth/', include('rest_framework.urls')),
]

接口认证-Session认证

  1. 设置方法
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.BasicAuthentication', # 基础授权
        'rest_framework.authentication.SessionAuthentication',  # Cookie/Session授权
    ]
}
  1. 启用授权接口
urlpatterns += [
    path('api-auth/', include('rest_framework.urls')),
]

接口认证-Token认证

  1. 设置方法
INSTALLED_APPS = [ 
    ... 
    'rest_framework.authtoken' 
]
  1. 为每个用户创建Token
from django.contrib.auth.models import User
from rest_framework.authtoken.models import Token

for user in User.objects.all():
    Token.objects.get_or_create(user=user)
  1. 启用Token认证URL
from rest_framework.authtoken import views
urlpatterns += [
    path('api-token-auth/', views.obtain_auth_token)
]

参考:Django Rest framework authentication

Celery使用

Celery介绍

Celery是一个基于分布式消息传递的异步任务队列/调度器,用于处理大量并发的任务。它是一个开源的Python库,提供了简单而强大的工具,用于将任务分发到多个工作进程或机器上,并实现任务的异步执行。
Celery主要特点和功能:

  • 异步任务处理:Celery允许将耗时的任务放入任务队列中,然后异步地执行这些任务。这样可以避免阻塞主线程,提高应用程序的响应性能。
  • 分布式架构:Celery支持分布式架构,可以将任务分发到多个工作进程或机器上进行处理。这使得Celery非常适合处理大规模的任务并发。
  • 定时任务调度:Celery提供了定时任务调度的功能,可以按照预定的时间间隔或时间规则执行任务。这使得可以轻松地实现定时任务、定期任务和周期性任务。
  • 异常处理和重试:Celery具有强大的异常处理和重试机制。如果任务执行失败,Celery可以自动重试任务,或者将任务标记为失败并进行相应的处理。
  • 结果追踪和监控:Celery可以追踪任务的执行结果,并提供监控和统计信息。这使得可以轻松地跟踪任务的状态和性能,并进行相应的监控和调优。
  • 可扩展性和灵活性:Celery具有良好的可扩展性,可以根据需求进行水平扩展,增加工作进程或机器的数量。它还提供了丰富的配置选项和插件机制,可以根据具体需求进行定制和扩展。

Celery基础使用

  1. 安装Celery
pip install celery
  1. 安装中间件RabbitMQ (或Redis) 并启动

MacOS(需要安装brew)
安装:brew install rabbitmq
启动:brew services start rabbitmq

Winodows(推荐使用Chocolatey安装)

参考:https://www.rabbitmq.com/install-windows.html

安装Chocolatey:以管理员身份PowerShell中运行

Set-ExecutionPolicy Bypass -Scope Process -Force; iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1'))
choco install rabbitmq

Docker安装

docker run -d -p 5672:5672 rabbitmq
  1. Django项目配置
    settings.py中添加配置:CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
    在项目(settings.py同级)目录新建:celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<项目目录名>.settings')
app = Celery('<项目目录名>')
app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks()

在项目(settings.py同级)目录编辑:init.py

from .celery import app as celery_app

__all__ = ('celery_app',)
  1. 编写任务
    在应用目录添加tasks.py
import time
from celery import shared_task

@shared_task
def run(n, interval):  # 支持可JSON序列化的参数
    for i in range(n):
        print('i =', i)
        time.sleep(interval)
  1. 使用任务
    在应用views.py中编写接口函数
def run_task(request):
    task = tasks.run.delay(30, 0.5)  # delay为异步运行
    result = AsyncResult(task.task_id)
    return JsonResponse({'status': result.state, 'task_id': task.task_id})

  1. 启动Worker进程
    在项目根目录下运行:
celery -A <项目目录名> worker -l INFO
  1. 访问接口 https://localhost:8000/run_task/ 可以看到worker进程的输出

image

Celery任务结果

如果想纪录任务的运行结果,可以使用django-celery-results库

  1. 安装:pip install django-celery-results
  2. 设置:INSTALLED_APPS = [ ... 'django_celery_results', ...] CELERY_RESULT_BACKEND = 'django-db'
  3. 重启Worker进程,运行任务,在Django Admin中可以查看到任务结果列表

image

Celery定时(周期性)任务

Celery定时任务,需要安装django-celery-beat库

参考: django-celery-beat

  1. 安装:pip instal django-celery-beat

  2. 注册:INSTALED_APPS = [... 'django_celery_beat', ...]

  3. 创建调度器

  • IntervalSchedule:间隔调度器
  • CrontabSchedule: Crontab调度器

IntervalSchedule使用方法

def add_period_task(request):
    schedule = IntervalSchedule.objects.create(every=10,  
                                               period=IntervalSchedule.SECONDS)
    task = PeriodicTask.objects.create(interval=schedule, 
                                       name='定时任务1', 
                                       task=tasks.run,
                                       args='[30,0.5]') # JSON字符串形式
    return JsonResponse({'status': 'ok', 'task_id': task.id})

CrontabSchedule使用方法

def add_period_task(request):
    schedule = CrontabSchedule.objects.create(
        minute='1',  # 每1分钟运行一次
        hour='*', 
        day_of_week='*',
        day_of_month='*', 
        month_of_year='*')
    task = PeriodicTask.objects.create(
        crontab=schedule,
        name='定时任务1',
        task=tasks.run,
        args='[30,0.5]'  # JSON字符串形式
    )
    return JsonResponse({'status': 'ok', 'task_id': task.id})
  1. 启动Beat进程(用与定时生成任务):
celery -A <项目目录名> beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

使用Channels

什么是WebSocket

WebSocket是一种在Web应用程序中实现双向通信的协议。它提供了一种持久化的连接,允许服务器主动向客户端推送数据,而不需要客户端发起请求。相比传统的HTTP请求-响应模式,WebSocket提供了实时性更好的双向通信能力。

WebSocket特点

  • 双向通信
  • 持久连接
  • 低延迟
  • 跨域支持
  • 协议标准化

Django Channels使用

Django Channels是一个用于处理实时Web应用程序的扩展库,为Django提供了对WebSocket和其他协议的支持。使用Django Channels需要Daphne异步服务支持,并需要Django3.0以上(建议Django4.0.0)。

  1. 安装方法:pip installl channels daphne
  2. 设置方法:
INSTALLED_APPDS = [... 'daphne', ...]

ASGI_APPLICATION = '<项目目录名>.asgi.application'
  1. 在项目目录urls.py中添加WebSocket主路由:websocket_urlpatterns = []
  2. 修改项目目录asgi.py异步网关配置:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from .urls import websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings')

application = ProtocolTypeRouter(
    {
        'http':  get_asgi_application(),
        'websocket': URLRouter(websocket_urlpatterns)
    }
)
  1. 编写WebSocket接口(消费者)
    在应用目录新建consumers.py,内容如下:
class MyConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()
        self.send('你已连接成功')

    def receive(self, text_data=None, bytes_data=None):
        self.send('已收到你的消息 %s' % text_data)
  1. 注册WebSocket接口路由
    在项目urls.py中websocket_urlpatterns中注册WebSocket接口路由
from <应用名>. import consumers

websocket_urlpatterns = [
    path('ws/my', consumers.MyConsumer.as_asgi())
]

  1. 使用Postman测试WebSocket接口

image

HTML连接WebSocket

  1. 在应用templates目录下新建HTML,如my.html
<h3>WebSocket消息</h3>
<div>
  <textarea id="receive" rows="10" cols="80"></textarea>
</div>
<input id="send"/>
<script>
  // 连接WebSocket
  socket = new WebSocket('ws://localhost:8000/ws/my');
  // 处理收到消息
  socket.onmessage = function(event){
    document.getElementById('receive').value += event.data + '\r\n';
  }
  // 处理按回车发送消息
  document.getElementById('send').onkeyup = function (event){
    if (event.key === 'Enter') {  // 按回车
        msg = document.getElementById('send').value
        socket.send(msg);
    }
  }
</script>

  1. 编写接口返回HTML页面,并注册url path('my/', views.my)
    在应用目录新建consumers.py,内容如下:
def my(request):
    return render(request, 'my.html')

  1. 访问页面 http://localhost:8000/my ,输入内容后按回车可以发送消息

image