美文网首页
Vue3+TypeScript+Django Rest Fram

Vue3+TypeScript+Django Rest Fram

作者: 落霞__孤鹜 | 来源:发表于2021-08-24 09:26 被阅读0次

    一个完整的网站都是有前台和管理后台组成的,前台用来给真正的用户浏览和使用,后台用来给管理员管理网站内容,配置各种功能和数据等。博客的管理后台就是用来承载创建博客,发布博客,查看留言,管理博客用户这些功能的子系统。

    大家好,我是落霞孤鹜,上一篇我们已经实现了用户注册,登录,登出的功能,这一章我们开始搭建博客的管理后台,实现对博客网站的管理功能。我会同样按照一个完整的功能,从需求分析到代码编写来阐述如何实现。

    一、需求分析

    作为一个完整的博客系统,管理后台是内容管理核心部分,在Python和PHP的世界里面,有很多做内容管理的库和开源项目,功能也是丰富多彩。这里我们从实际需要出发,整理了如下需求要点:

    1. Dashboard: 主要展示整个博客网站的访问情况,包括浏览量,点赞量,评论量,留言量等内容。
    2. 分类管理:主要用来组织文章的分类,通过分类帮助用户更好的浏览整个博客网站。
    3. 标签管理:主要用来管理文章的标签,标注文章的类型,帮助用户更好的识别文档的类型。
    4. 文章管理:主要用来完成文章的新增,修改,发布,删除等,考虑到文章发布的方便,需要支持 Markdown 语法。
    5. 评论管理:主要用来查看文章的评论信息,如果存在敏感内容,可以通过后台进行删除。
    6. 用户管理:主要用来管理博客网站注册的用户信息,可以禁用用户等。

    以上功能也算是一套2B端产品的核心功能框架。

    二、后端接口开发

    后端承担业务逻辑处理和数据持久化的责任,基于需求分析中涉及的业务对象,我们需要先进行模型设计,映射到 Django 中,就是先建立 Model

    2.1 Model 层代码实现

    2.1.1 物理模型说明

    基于需求分析,通过对业务模型到物理模型的转换,这里主要有一下物理模型:

    1. 分类表:存储文章分类,与文章表的关系是一对多,即:一个分类可以关联多篇文章,一篇文章只能属于一个分类
    2. 标签表:存储文章的标签,与文章表的关系是多对多,即:一个标签可以属于多个文章,一篇文章可以管理多个标签
    3. 文章表:存储文章信息,需要记录文章的标题,摘要,正文,封面,浏览量,评论量,点赞量等
    4. 评论表:存储文章的评论信息,与文章表是多对一关系,即一篇评论只能关联一篇文章,但是一篇文章可以对应多篇评论
    5. 点赞表:记录用户点赞信息,与文章表是多对一关系,即一则点赞对应一篇文章,一篇文章对应多则点赞。
    6. 用户表:记录用户信息,包含博客的查看者,也包含网站的管理员

    2.1.2 代码实现

    2.1.2.1 安装依赖

    在分类表的设计中,我们经常采用的是邻接表的方式,通过一个parent_id自关联自己,实现父级和子级的关联,形成树形结构。这种设计在新增和修改的时候,非常方便,只需要一次查询即可完成,但是在父查子,子查父,删除等操作时却需要较多的IO损耗。

    而实际中,查询要比修改多,因此这里我们采用一种新的数据结构 MPTT,预排序遍历树,一种更高效的查询和管理树形数据的数据结构。因此需要安装依赖

    pip install django-mptt==0.12.0
    

    然后在 requirements.txt 中增加依赖信息

    django-mptt==0.12.0
    
    2.1.2.2 管理常量

    后端在处理各类业务时,会遇到各类枚举类型,比如用户的身份,性别,文章状态等等,在代码的世界里面,尽量不要用 Magic number ,而是通过常量的方式进行管理。

    common 下新增文件 constants.py ,编写代码如下:

    class Constant(object):
        ARTICLE_STATUS = (
            ('Draft', '草稿'),
            ('Published', '已发布'),
            ('Deleted', '已删除')
        )
        ARTICLE_STATUS_DELETED = 'Deleted'
        ARTICLE_STATUS_PUBLISHED = 'Published'
        ARTICLE_STATUS_DRAFT = 'Draft'
    
        GENDERS = (
            ('Male', '男'),
            ('Female', '女'),
            ('Unknown', '未知'),
        )
        GENDERS_UNKNOWN = 'Unknown'
    
    2.1.2.3 Model部分

    这里需要说明几个点:

    1. 在各个表的定义中,通过内部类 Meta 可以定义模型类的元信息,比如表名db_table,排序方式ordering- 表示倒序。
    2. MPTT 模型有一个单独的内部类MPTTMeta,可以定义parent和排序字段order_insertion_by
    3. 对于多对多的关系,Django 提供了 ManyToMany的字段类型,这种会自动生成一张中间表用来记录两个表的多对多数据。
    4. 外键关联在定义的时候,需要指定唯一的related_name,以方便表与表的联合检索。
    5. 外键管理中on_delete,需要依据实际情况来确定是级联删除还是不做处理。

    blog/models.py下编写如下代码:

    import mptt.models
    from django.db import models
    
    from common.constants import Constant
    from common.models import AbstractBaseModel, User
    
    class Tag(AbstractBaseModel):
        name = models.CharField('标签名称', max_length=50, unique=True, null=False, blank=False)
    
        class Meta:
            db_table = 'blog_tag'
    
        def __str__(self):
            return self.name
    
    
    class Catalog(mptt.models.MPTTModel, AbstractBaseModel):
        name = models.CharField('分类名称', max_length=50, unique=True, null=False, blank=False)
        parent = mptt.models.TreeForeignKey('self', on_delete=models.CASCADE, null=True, blank=True,
                                            related_name='children')
    
        class Meta:
            db_table = 'blog_catalog'
    
        class MPTTMeta:
            order_insertion_by = ['name']
    
        def __str__(self):
            return self.name
    
    
    class Article(AbstractBaseModel):
        title = models.CharField('文章标题', max_length=100, unique=True, null=False, blank=False)
        cover = models.TextField('封面', max_length=1000, null=False, blank=False)
        excerpt = models.CharField('摘要', max_length=200, blank=True)
        keyword = models.CharField('关键词', max_length=200, blank=True)
        markdown = models.TextField('正文', max_length=100000, null=False, blank=False)
        status = models.CharField('文章状态', max_length=30, choices=Constant.ARTICLE_STATUS,
                                  default=Constant.ARTICLE_STATUS_DRAFT)
        catalog = models.ForeignKey(Catalog, verbose_name='所属分类', null=False, blank=False,
                                    on_delete=models.DO_NOTHING, related_name='cls_articles')
        tags = models.ManyToManyField(Tag, verbose_name='文章标签', blank=True, related_name='tag_articles')
    
        author = models.ForeignKey(User, verbose_name='作者', on_delete=models.DO_NOTHING, null=False, blank=False)
        views = models.PositiveIntegerField('浏览量', default=0, editable=False)
        comments = models.PositiveIntegerField('评论数量', default=0, editable=False)
        likes = models.PositiveIntegerField('点赞量', default=0, editable=False)
        words = models.PositiveIntegerField('字数', default=0, editable=False)
    
        class Meta:
            db_table = 'blog_article'
            ordering = ["-created_at"]
    
        def __str__(self):
            return self.title
    
    
    class Like(AbstractBaseModel):
        article = models.ForeignKey(Article, on_delete=models.DO_NOTHING, related_name='article_likes')
        user = models.ForeignKey(User, on_delete=models.DO_NOTHING, related_name='like_users')
    
        class Meta:
            db_table = 'blog_like'
    
    
    class Comment(AbstractBaseModel):
        article = models.ForeignKey(Article, verbose_name='评论文章', on_delete=models.DO_NOTHING,
                                    related_name='article_comments')
        user = models.ForeignKey(User, verbose_name='评论者', on_delete=models.DO_NOTHING, related_name='comment_users')
        reply = models.ForeignKey('self', verbose_name='评论回复', on_delete=models.CASCADE, related_name='comment_reply',
                                  null=True, blank=True)
        content = models.TextField('评论', max_length=10000, null=False, blank=False)
    
        class Meta:
            db_table = 'blog_comment'
    
    
    class Message(AbstractBaseModel):
        email = models.EmailField('邮箱', max_length=100, null=False, blank=False)
        content = models.TextField('内容', max_length=10000, null=False, blank=False)
        phone = models.CharField('手机', max_length=20, null=True, blank=True)
        name = models.CharField('姓名', max_length=30, null=True, blank=True)
        
        class Meta:
            db_table = 'blog_message'
    
    

    2.2 Serializer 层代码实现

    2.2.1 整理说明

    在使用Rest Framework 框架的时候,定义Serializer是使用这个框架最核心的内容,有几个点需要处理:

    1. 对一个模型,哪一些字段需要在API中作为入参,哪一些字段作为出参(通过 fields 定义)
    2. 对于接口,哪一些字段只读,哪一些字段可写
    3. 对于外键字段,如何序列化和反序列化,可以具体指定每一个字段的序列化方式
    4. 如何增加模型中没有出现的字段

    Rest Framework 本身提供了较多的支持方案,包括基Model的自动序列化方案和基于类的序列化方案,更多细节可以查看官网资料Serializers - Django REST framework

    2.2.2 代码实现

    对文章部分的定义,考虑到文章是整个博客的核心,所以对其序列化的方案,这里实现了三个版本ArticleListSerializerArticleSerializerArticleChangeStatusSerializer

    • ArticleListSerializer:用来对应列表查询,完成界面上的展示,可以更好的隔离读和写的权限
    • ArticleSerializer:用来完成新增,修改,删除,详情查看,通过集成ArticleListSerializer实现
    • ArticleChangeStatusSerializer:用来完成上线,下线操作,这两个接口只需要有限字段的入参和出参

    博客 App serializer 部分代码编写在blog/serializers.py 文件中,具体代码如下:

    from rest_framework import serializers
    
    from blog.models import Catalog, Tag, Article, Like, Message, Comment
    
    
    class CatalogSerializer(serializers.ModelSerializer):
        class Meta:
            model = Catalog
            fields = ['id', 'name', 'parent']
    
    
    class TagSerializer(serializers.ModelSerializer):
        class Meta:
            model = Tag
            fields = ['id', 'name', 'created_at', 'modified_at']
            extra_kwargs = {
                'created_at': {'read_only': True},
                'modified_at': {'read_only': True},
            }
    
    
    class ArticleListSerializer(serializers.ModelSerializer):
        tags_info = serializers.SerializerMethodField(read_only=True)
        catalog_info = serializers.SerializerMethodField(read_only=True)
        status = serializers.SerializerMethodField(read_only=True)
    
        class Meta:
            model = Article
            fields = ['id', 'title', 'excerpt', 'cover', 'created_at', 'modified_at', 'tags',
                      'tags_info', 'catalog', 'catalog_info', 'views', 'comments', 'words', 'likes', 'status', ]
    
            extra_kwargs = {
                'tags': {'write_only': True},
                'catalog': {'write_only': True},
                'views': {'read_only': True},
                'comments': {'read_only': True},
                'words': {'read_only': True},
                'likes': {'read_only': True},
                'created_at': {'read_only': True},
                'modified_at': {'read_only': True},
            }
    
        @staticmethod
        def get_tags_info(obj: Article) -> list:
            if not obj.title:
                article = Article.objects.get(id=obj.id)
                tags = article.tags.all()
            else:
                tags = obj.tags.all()
            return [{'id': tag.id, 'name': tag.name} for tag in tags]
    
        @staticmethod
        def get_catalog_info(obj: Article) -> dict:
            if not obj.catalog:
                book = Article.objects.get(id=obj.id)
                catalog = book.catalog
            else:
                catalog = obj.catalog
            return {
                'id': catalog.id,
                'name': catalog.name,
                'parents': [c.id for c in catalog.get_ancestors(include_self=True)]
            }
    
        @staticmethod
        def get_status(obj: Article) -> list:
            return obj.get_status_display()
    
    
    class ArticleSerializer(ArticleListSerializer):
        tags_info = serializers.SerializerMethodField(read_only=True)
        catalog_info = serializers.SerializerMethodField(read_only=True)
    
        class Meta(ArticleListSerializer.Meta):
            fields = ['markdown', 'keyword']
            fields.extend(ArticleListSerializer.Meta.fields)
    
    
    class ArticleChangeStatusSerializer(serializers.ModelSerializer):
        class Meta:
            model = Article
            fields = ['id', 'status', ]
            extra_kwargs = {
                'status': {'read_only': True},
            }
    
    
    class LikeSerializer(serializers.ModelSerializer):
        user_info = serializers.SerializerMethodField(read_only=True)
        article_info = serializers.SerializerMethodField(read_only=True)
    
        class Meta:
            model = Like
            fields = ['user', 'user_info', 'article', 'article_info', 'created_at']
            extra_kwargs = {
                'created_at': {'read_only': True},
            }
    
        @staticmethod
        def get_user_info(obj: Like) -> dict:
            if not obj.user:
                return {}
            else:
                user = obj.user
            return {'id': user.id, 'name': user.nickname or user.username, 'avatar': user.avatar}
    
        @staticmethod
        def get_article_info(obj: Like) -> dict:
            if not obj.article:
                return {}
            else:
                article = obj.article
            return {'id': article.id, 'title': article.title}
    
    
    class CommentSerializer(serializers.ModelSerializer):
        user_info = serializers.SerializerMethodField(read_only=True)
        article_info = serializers.SerializerMethodField(read_only=True)
        comment_replies = serializers.SerializerMethodField(read_only=True)
    
        class Meta:
            model = Comment
            fields = ['id', 'user', 'user_info', 'article', 'article_info', 'created_at', 'reply', 'content',
                      'comment_replies']
            extra_kwargs = {
                'created_at': {'read_only': True},
            }
    
        @staticmethod
        def get_user_info(obj: Comment) -> dict:
            if not obj.user:
                return {}
            else:
                user = obj.user
            return {'id': user.id, 'name': user.nickname or user.username, 'avatar': user.avatar}
    
        @staticmethod
        def get_article_info(obj: Comment) -> dict:
            if not obj.article:
                return {}
            else:
                article = obj.article
            return {'id': article.id, 'title': article.title}
    
        @staticmethod
        def get_comment_replies(obj: Comment):
            if not obj.comment_reply:
                return []
            else:
                replies = obj.comment_reply.all()
            return [{
                'id': reply.id,
                'content': reply.content,
                'user_info': {
                    'id': reply.user.id,
                    'name': reply.user.nickname or reply.user.username,
                    'avatar': reply.user.avatar,
                    'role': reply.user.role,
                },
                'created_at': reply.created_at
            } for reply in replies]
    
    
    class MessageSerializer(serializers.ModelSerializer):
        class Meta:
            model = Message
            fields = ['email', 'phone', 'name', 'content', 'created_at']
            extra_kwargs = {
                'created_at': {'read_only': True},
            }
    
    

    2.3 工具方法

    为了更好的复用代码逻辑,我们一般会抽象一些工具方法,主要是时间处理方法和上传相关的路径处理,在common/utils.py中编写如下代码:

    import os
    import random
    import string
    import time
    from datetime import datetime
    
    from django.conf import settings
    from django.template.defaultfilters import slugify
    
    
    def get_upload_file_path(upload_name):
        # Generate date based path to put uploaded file.
        date_path = datetime.now().strftime('%Y/%m/%d')
    
        # Complete upload path (upload_path + date_path).
        upload_path = os.path.join(settings.UPLOAD_URL, date_path)
        full_path = os.path.join(settings.BASE_DIR, upload_path)
        make_sure_path_exist(full_path)
        file_name = slugify_filename(upload_name)
        return os.path.join(full_path, file_name).replace('\\', '/'), os.path.join('/', upload_path, file_name).replace('\\', '/')
    
    
    def slugify_filename(filename):
        """ Slugify filename """
        name, ext = os.path.splitext(filename)
        slugified = get_slugified_name(name)
        return slugified + ext
    
    
    def get_slugified_name(filename):
        slugified = slugify(filename)
        return slugified or get_random_string()
    
    
    def get_random_string():
        return ''.join(random.sample(string.ascii_lowercase * 6, 6))
    
    
    def make_sure_path_exist(path):
        if os.path.exists(path):
            return
        os.makedirs(path, exist_ok=True)
    
    
    def format_time(dt: datetime, fmt: str = ''):
        fmt_str = fmt or '%Y-%m-%d %H:%M:%S'
        return dt.strftime(fmt_str)
    
    
    def get_year(dt: datetime) -> int:
        return dt.year
    
    
    def get_now() -> str:
        return format_time(datetime.now())
    
    
    def format_time_from_str(date_time_str: str, fmt: str = ''):
        fmt_str = fmt or '%Y-%m-%d %H:%M:%S'
        return datetime.strptime(date_time_str, fmt_str)
    
    
    def transform_time_to_str(t: int):
        return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t))
    
    

    2.4 ViewSet层代码实现

    2.4.1 安装依赖

    为了更好的实现在列表查询时的搜索条件识别和校验,我们安装一个新的库:django-filter

    pip install django-filter==2.4.0
    

    requirements.txt 中增加依赖信息

    django-filter==2.4.0
    

    2.4.2 通用 ViewSet 定义

    在处理接口层定义的时候,我们需要考虑接口的访问权限,分页,查询过滤条件,新增和修改时的操作人,用户角色判断等,这些处理是在每一个接口中都需要处理的,因此我们这里将这些逻辑统一抽象到一个基础类中完成,然后通过Python的多继承完成子类的能力扩充。

    1. 定义了BaseError,用于在出现各类业务校验不通过时抛出异常。
    2. 定义了BasePagination,用于列表查询接口的分页。
    3. 定义了BaseViewSetMixin类,作为常规ViewSet的基类,将分页、过滤条件、权限、操作者填充、用户身份判断等。
    4. 定义了ConstantViewSet类,用于将后端使用的常量提供给前端,用做前端需要判断枚举值之用。
    5. 定义了ImageUploadViewSet类,用于在新增文章时上传封面之用。

    common/views.py中增加如下代码:

    import logging
    
    import django.conf
    from django.contrib.auth import authenticate, login, logout as auth_logout
    from django.contrib.auth.hashers import make_password
    from django.contrib.auth.models import AnonymousUser
    from django.core.mail import send_mail
    from django.db.models import QuerySet
    from django_filters.rest_framework import DjangoFilterBackend
    from rest_framework import viewsets, permissions, status
    from rest_framework.exceptions import ValidationError
    from rest_framework.generics import GenericAPIView
    from rest_framework.pagination import PageNumberPagination
    from rest_framework.response import Response
    from rest_framework.views import APIView
    
    from common.constants import Constant
    from common.models import User
    from common.serializers import UserSerializer, UserLoginSerializer, UserPasswordSerializer
    from common.utils import get_upload_file_path
    
    
    def get_random_password():
        import random
        import string
        return ''.join(random.sample(string.ascii_letters + string.digits + string.punctuation, 8))
    
    
    class BaseError(ValidationError):
        def __init__(self, detail=None, code=None):
            super(BaseError, self).__init__(detail={'detail': detail})
    
    
    class BasePagination(PageNumberPagination):
        """
            customer pagination
        """
        # default page size
        page_size = 10
        # page size param in page size
        page_size_query_param = 'page_size'
        # page param in api
        page_query_param = 'page'
        # max page size
        max_page_size = 100
    
    
    class BaseViewSetMixin(object):
        pagination_class = BasePagination
        filter_backends = [DjangoFilterBackend]
        permission_classes = [permissions.IsAuthenticatedOrReadOnly]
    
        def __init__(self, **kwargs):
            super(BaseViewSetMixin, self).__init__(**kwargs)
            self.filterset_fields = []
            self.init_filter_field()
    
        def init_filter_field(self):
            """
            Init filter field by the fields' intersection in model and serializer
            e.g. `book/?id=1&authors=2`
            :return:  None
            """
            serializer = self.get_serializer_class()
            if not hasattr(serializer, 'Meta'):
                return
            meta = serializer.Meta
    
            if not hasattr(meta, 'model'):
                return
            model = meta.model
    
            if not hasattr(meta, 'fields'):
                ser_fields = []
            else:
                ser_fields = meta.fields
    
            for field in ser_fields:
                if not hasattr(model, field):
                    continue
                self.filterset_fields.append(field)
    
        def perform_update(self, serializer):
            user = self.fill_user(serializer, 'update')
            return serializer.save(**user)
    
        def perform_create(self, serializer):
            user = self.fill_user(serializer, 'create')
            return serializer.save(**user)
    
        @staticmethod
        def fill_user(serializer, mode):
            """
            before save, fill user info into para from session
            :param serializer: Model's serializer
            :param mode: create or update
            :return: None
            """
            request = serializer.context['request']
    
            user_id = request.user.id
            ret = {'modifier': user_id}
    
            if mode == 'create':
                ret['creator'] = user_id
            return ret
    
        def get_pk(self):
            if hasattr(self, 'kwargs'):
                return self.kwargs.get('pk')
    
        def is_reader(self):
            return isinstance(self.request.user, AnonymousUser) or not self.request.user.is_superuser
    
    
    class BaseModelViewSet(BaseViewSetMixin, viewsets.ModelViewSet):
        pass
    
    
    class UserViewSet(viewsets.ModelViewSet):
        queryset = User.objects.all().order_by('username')
        serializer_class = UserSerializer
        permission_classes = [permissions.AllowAny]
    
    
    class UserLoginViewSet(GenericAPIView):
        permission_classes = [permissions.AllowAny]
        serializer_class = UserLoginSerializer
        queryset = User.objects.all()
    
        def post(self, request, *args, **kwargs):
            username = request.data.get('username', '')
            password = request.data.get('password', '')
    
            user = authenticate(username=username, password=password)
            if user is not None and user.is_active:
                login(request, user)
                serializer = UserSerializer(user)
                return Response(serializer.data, status=200)
            else:
                ret = {'detail': 'Username or password is wrong'}
                return Response(ret, status=403)
    
    
    class UserLogoutViewSet(GenericAPIView):
        permission_classes = [permissions.IsAuthenticated]
        serializer_class = UserLoginSerializer
    
        def get(self, request, *args, **kwargs):
            auth_logout(request)
            return Response({'detail': 'logout successful !'})
    
    
    class PasswordUpdateViewSet(GenericAPIView):
        permission_classes = [permissions.IsAuthenticated]
        serializer_class = UserPasswordSerializer
        queryset = User.objects.all()
    
        def post(self, request, *args, **kwargs):
            user_id = request.user.id
            password = request.data.get('password', '')
            new_password = request.data.get('new_password', '')
            user = User.objects.get(id=user_id)
            if not user.check_password(password):
                ret = {'detail': 'old password is wrong !'}
                return Response(ret, status=403)
    
            user.set_password(new_password)
            user.save()
            return Response({
                'detail': 'password changed successful !'
            })
    
        def put(self, request, *args, **kwargs):
            """
            Parameter: username->user's username who forget old password
            """
            username = request.data.get('username', '')
            users = User.objects.filter(username=username)
            user: User = users[0] if users else None
    
            if user is not None and user.is_active:
                password = get_random_password()
    
                try:
                    send_mail(subject="New password for Blog site",
                              message="Hi: Your new password is: \n{}".format(password),
                              from_email=django.conf.settings.EMAIL_HOST_USER,
                              recipient_list=[user.email],
                              fail_silently=False)
                    user.password = make_password(password)
                    user.save()
                    return Response({
                        'detail': 'New password will send to your email!'
                    })
                except Exception as e:
                    print(e)
                    return Response({
                        'detail': 'Send New email failed, Please check your email address!'
                    })
            else:
                ret = {'detail': 'User does not exist(Account is incorrect !'}
                return Response(ret, status=403)
    
    
    class ConstantViewSet(GenericAPIView):
        permission_classes = [permissions.IsAuthenticated]
        serializer_class = UserPasswordSerializer
        queryset = QuerySet()
    
        def get(self, request, *args, **kwargs):
            ret = {}
            for key in dir(Constant):
                if not key.startswith("_"):
                    ret[key] = getattr(Constant, key)
            return Response(ret)
    
    
    class ImageUploadViewSet(APIView):
        permission_classes = [permissions.AllowAny]
    
        def post(self, request, *args, **kwargs):
    
            try:
                if request.method == 'POST' and request.FILES:
                    uploaded_file = request.FILES['file']
    
                    full_file_path, file_path = get_upload_file_path(uploaded_file.name)
                    self.handle_uploaded_file(uploaded_file, full_file_path)
    
                    response = {
                        'url': file_path
                    }
                    return Response(response)
    
            except Exception as e:
                logging.getLogger('default').error(e, exc_info=True)
                raise BaseError(detail='Upload failed', code=status.HTTP_500_INTERNAL_SERVER_ERROR)
    
        @staticmethod
        def handle_uploaded_file(f, file_path):
            destination = open(file_path, 'wb+')
            for chunk in f.chunks():
                destination.write(chunk)
            destination.close()
    
    

    2.4.3 Blog 相关的 ViewSet 定义

    这里的ViewSet类通过继承框架提供的基类或者我们自己封装的BaseViewSet类,来实现对应的业务接口,如果是非常传统的 CURD 接口,在ViewSet里面可能仅仅只需要定义 queryset的属性就可以完成新增,修改,删除,详情查询,列表查询的接口。

    可以看到我们的 Article 对象相关的ViewSet7 个,主要是兼顾了文章在各种维度下的查询和管理需求,比如我们需要按照时间对文章进行查询,需要通过浏览量倒序展示文章列表,需要上线,下架文章,需要不登录就能浏览文章,需要登录管理员才能管理文章等各种需求。

    具体代码如下:

    import datetime
    
    from common.utils import get_year
    from django.db.models import QuerySet, Sum, Count
    from rest_framework import mixins
    from rest_framework.response import Response
    from rest_framework.viewsets import GenericViewSet
    
    from blog.models import Article, Comment, Message, Tag, Catalog, Like
    from blog.serializers import ArticleSerializer, CommentSerializer, MessageSerializer, TagSerializer, \
        ArticleListSerializer, CatalogSerializer, ArticleChangeStatusSerializer, LikeSerializer
    from common.constants import Constant
    from common.views import BaseModelViewSet, BaseViewSetMixin
    
    
    class ArticleArchiveListViewSet(BaseViewSetMixin, mixins.ListModelMixin, GenericViewSet):
        queryset = Article.objects.all()
        serializer_class = ArticleListSerializer
    
        def filter_queryset(self, queryset) -> QuerySet:
            queryset = super(ArticleArchiveListViewSet, self).filter_queryset(queryset)
            if self.is_reader():
                queryset = queryset.exclude(status=Constant.ARTICLE_STATUS_DRAFT)
            return queryset.exclude(status=Constant.ARTICLE_STATUS_DELETED)
    
        def list(self, request, *args, **kwargs):
            queryset = self.filter_queryset(self.get_queryset())
            total = len(queryset)
            page_size, page_number = self.get_page_info()
            start_year, end_year = self.get_datetime_range(page_size, page_number)
            queryset = queryset.filter(created_at__gte=start_year).filter(created_at__lt=end_year)
            ret = {
                "count": total,
                "next": None,
                "previous": None,
                'results': []
            }
            years = {}
            for article in queryset.all():
                year = article.created_at.year
                articles = years.get(year)
                if not articles:
                    articles = []
                    years[year] = articles
                serializer = self.get_serializer(article)
                articles.append(serializer.data)
            for key, value in years.items():
                ret['results'].append({
                    'year': key,
                    'list': value
                })
            ret['results'].sort(key=lambda i: i['year'], reverse=True)
            return Response(ret)
    
        def get_page_info(self):
            page_size = self.paginator.get_page_size(self.request)
            page_number = self.request.query_params.get(self.paginator.page_query_param, 1)
            return page_size, int(page_number)
    
        @staticmethod
        def get_datetime_range(page_size, page_number):
            current_year = get_year(datetime.datetime.now())
            start_year = current_year - page_size * page_number + 1
            start_datetime = '{:d}-01-01 00:00:00'.format(start_year)
            end_datetime = '{:d}-01-01 00:00:00'.format(start_year + page_size)
            return start_datetime, end_datetime
    
    
    class ArticleListViewSet(BaseViewSetMixin, mixins.ListModelMixin,
                             GenericViewSet):
        queryset = Article.objects.all().select_related('catalog', 'author')
        serializer_class = ArticleListSerializer
    
        def filter_queryset(self, queryset):
            self.filterset_fields.remove('catalog')
            queryset = super(ArticleListViewSet, self).filter_queryset(queryset)
            if self.is_reader():
                queryset = queryset.exclude(status=Constant.ARTICLE_STATUS_DRAFT)
            params = self.request.query_params
            if 'catalog' in params:
                catalog_id = params.get('catalog', 1)
                catalog = Catalog.objects.get(id=catalog_id)
                catalogs = catalog.get_descendants(include_self=True)
                queryset = queryset.filter(catalog__in=[c.id for c in catalogs])
            return queryset.exclude(status=Constant.ARTICLE_STATUS_DELETED)
    
    
    class ArticleViewSet(BaseViewSetMixin,
                         mixins.CreateModelMixin,
                         mixins.RetrieveModelMixin,
                         mixins.UpdateModelMixin,
                         mixins.DestroyModelMixin,
                         GenericViewSet):
        queryset = Article.objects.all()
        serializer_class = ArticleSerializer
    
        def perform_create(self, serializer):
            extra_infos = self.fill_user(serializer, 'create')
            extra_infos['author'] = self.request.user
            serializer.save(**extra_infos)
    
        def filter_queryset(self, queryset):
            queryset = super(ArticleViewSet, self).filter_queryset(queryset)
            if self.is_reader():
                queryset = queryset.exclude(status=Constant.ARTICLE_STATUS_DRAFT).exclude(
                    status=Constant.ARTICLE_STATUS_DELETED)
            return queryset
    
        def perform_destroy(self, instance: Article):
            instance.status = Constant.ARTICLE_STATUS_DELETED
            instance.save()
    
        def retrieve(self, request, *args, **kwargs):
            instance: Article = self.get_object()
            serializer = self.get_serializer(instance)
            if self.is_reader():
                instance.views += 1
                instance.save()
            return Response(serializer.data)
    
    
    class ArticlePublishViewSet(BaseViewSetMixin,
                                mixins.UpdateModelMixin,
                                GenericViewSet):
        queryset = Article.objects.all()
        serializer_class = ArticleChangeStatusSerializer
    
        def filter_queryset(self, queryset):
            queryset = super(ArticlePublishViewSet, self).filter_queryset(queryset)
            return queryset.exclude(status=Constant.ARTICLE_STATUS_DELETED)
    
        def perform_update(self, serializer):
            extra_infos = self.fill_user(serializer, 'update')
            extra_infos['status'] = Constant.ARTICLE_STATUS_PUBLISHED
            serializer.save(**extra_infos)
    
    
    class ArticleOfflineViewSet(ArticlePublishViewSet):
        def perform_update(self, serializer):
            extra_infos = self.fill_user(serializer, 'update')
            extra_infos['status'] = Constant.ARTICLE_STATUS_DRAFT
            serializer.save(**extra_infos)
    
    
    class CommentViewSet(BaseModelViewSet):
        queryset = Comment.objects.all()
        serializer_class = CommentSerializer
    
        def filter_queryset(self, queryset):
            queryset = super(CommentViewSet, self).filter_queryset(queryset)
            return queryset.filter(reply__isnull=True)
    
        def perform_create(self, serializer):
            super(CommentViewSet, self).perform_create(serializer)
            article: Article = serializer.validated_data['article']
            article.comments += 1
            article.save()
    
    
    class LikeViewSet(BaseModelViewSet):
        queryset = Like.objects.all()
        serializer_class = LikeSerializer
    
        def perform_create(self, serializer):
            super(LikeViewSet, self).perform_create(serializer)
            article: Article = serializer.validated_data['article']
            article.likes += 1
            article.save()
    
    
    class MessageViewSet(BaseModelViewSet):
        queryset = Message.objects.all()
        serializer_class = MessageSerializer
    
    
    class TagViewSet(BaseModelViewSet):
        queryset = Tag.objects.all()
        serializer_class = TagSerializer
    
    
    class CatalogViewSet(BaseModelViewSet):
        queryset = Catalog.objects.all()
        serializer_class = CatalogSerializer
    
        def list(self, request, *args, **kwargs):
            ret = []
            roots = Catalog.objects.filter(id=1).filter(parent__isnull=True)
            if not roots:
                return Response(ret)
            root: Catalog = roots[0]
            root_dict = CatalogSerializer(root).data
            root_dict['children'] = []
            ret.append(root_dict)
            parent_dict = {root.id: root_dict}
            for cls in root.get_descendants():
                data = CatalogSerializer(cls).data
    
                parent_id = data.get('parent')
                parent = parent_dict.get(parent_id)
                parent['children'].append(data)
    
                if not cls.is_leaf_node() and cls.id not in parent_dict:
                    data['children'] = []
                    parent_dict[cls.id] = data
            return Response(ret)
    
    
    class NumberViewSet(BaseViewSetMixin,
                        mixins.ListModelMixin,
                        GenericViewSet):
        queryset = Article.objects.all()
        serializer_class = ArticleListSerializer
    
        def list(self, request, *args, **kwargs):
            queryset = self.get_queryset().aggregate(Sum('views'), Sum('likes'), Sum('comments'))
            messages = Message.objects.aggregate(Count('id'))
    
            return Response({
                'views': queryset['views__sum'],
                'likes': queryset['likes__sum'],
                'comments': queryset['comments__sum'],
                'messages': messages['id__count']
            })
    
    
    class TopArticleViewSet(NumberViewSet):
        def list(self, request, *args, **kwargs):
            queryset = self.filter_queryset(self.get_queryset()).order_by('-views')[:10]
    
            page = self.paginate_queryset(queryset)
            if page is not None:
                serializer = self.get_serializer(page, many=True)
                return self.get_paginated_response(serializer.data)
    
            serializer = self.get_serializer(queryset, many=True)
            return Response(serializer.data)
    
    

    2.5 定义URL

    2.5.1 安装API文档自动生成依赖

    通过工具,可以自动生成基于Restful的接口说明。

    pip install drf-yasg==1.20.0
    

    修改requirements.txt,增加如下:

    drf-yasg==1.20.0
    

    2.5.2 common 下的 urls.py

    修改common/urls.py,最终代码如下代码:

    from django.conf.urls import url
    from django.urls import include, path
    from rest_framework import routers
    
    from common import views
    from common.views import ImageUploadViewSet
    
    router = routers.DefaultRouter()
    router.register('user', views.UserViewSet)
    
    app_name = 'common'
    
    urlpatterns = [
        path('', include(router.urls)),
        url(r'^user/login', views.UserLoginViewSet.as_view()),
        url(r'^user/logout', views.UserLogoutViewSet.as_view()),
        url(r'^user/pwd', views.PasswordUpdateViewSet.as_view()),
        url(r'^dict', views.ConstantViewSet.as_view()),
        url(r'upload/$', ImageUploadViewSet.as_view()),
    ]
    

    2.5.3 blog 下的 urls.py

    blog/urls.py中编写如下代码:

    from django.urls import include, path
    from rest_framework import routers
    
    from blog import views
    
    router = routers.DefaultRouter()
    router.register('article', views.ArticleViewSet)
    router.register('list', views.ArticleListViewSet)
    router.register('publish', views.ArticlePublishViewSet)
    router.register('offline', views.ArticleOfflineViewSet)
    router.register('archive', views.ArticleArchiveListViewSet)
    router.register('tag', views.TagViewSet)
    router.register('catalog', views.CatalogViewSet)
    router.register('comment', views.CommentViewSet)
    router.register('like', views.LikeViewSet)
    router.register('message', views.MessageViewSet)
    router.register('number', views.NumberViewSet)
    router.register('top', views.TopArticleViewSet)
    
    app_name = 'blog'
    
    urlpatterns = [
        path('', include(router.urls)),
    ]
    

    2.5.4 project下的urls.py

    这里我们使用drf_yasg提供的方法,自动生成接口说明文档,在实际的前后端分类的项目中,这是非常有用的一个工具,可以让前端和后端基于接口约定并行开发,保证前端和后端的开发效率。

    修改project/urls.py,最终代码如下:

    from django.conf import settings
    from django.conf.urls import url
    from django.urls import path, re_path, include
    from django.views.generic import RedirectView
    from django.views.static import serve
    from drf_yasg import openapi
    from drf_yasg.views import get_schema_view
    from rest_framework import permissions
    
    schema_view = get_schema_view(
        openapi.Info(
            title="Blog System API",
            description="Blog site ",
            default_version='v1',
            terms_of_service="",
            contact=openapi.Contact(email="XXXX@163.com"),
            license=openapi.License(name="GPLv3 License"),
        ),
        public=True,
        permission_classes=(permissions.AllowAny,),
    )
    
    urlpatterns = [
        path('', include('blog.urls', namespace='blog')),
        path('', include('common.urls', namespace='common')),
        url(r'^favicon.ico$', RedirectView.as_view(url=r'static/img/favicon.ico')),
        url(r'upload/(?P<path>.*)', serve, {'document_root': settings.MEDIA_ROOT}),
        path('api-auth/', include('rest_framework.urls', namespace='rest_framework')),
        re_path(
            r"api/swagger(?P<format>\.json|\.yaml)",
            schema_view.without_ui(cache_timeout=0),
            name="schema-json",
        ),
        path(
            "swagger/",
            schema_view.with_ui("swagger", cache_timeout=0),
            name="schema-swagger-ui",
        ),
        path("docs/", schema_view.with_ui("redoc", cache_timeout=0), name="schema-redoc"),
    ]
    
    

    2.6 配置调整

    2.6.1 调整project/setting.py

    project/setting.pyINSTALLED_APPS 中增加blogdrf_yasgdjango_filters,如果不添加,则会出现模板路径找不到的问题

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'rest_framework',
        'drf_yasg',
        'django_filters',
        'common',
        'blog'
    ]
    

    project/setting.pyTEMPLATES 调整为:

    TEMPLATES = [
        {
            'BACKEND': 'django.template.backends.django.DjangoTemplates',
            'DIRS': [os.path.join(BASE_DIR, 'templates')],
            'APP_DIRS': True,
            'OPTIONS': {
                'context_processors': [
                    'django.template.context_processors.debug',
                    'django.template.context_processors.request',
                    'django.contrib.auth.context_processors.auth',
                    'django.contrib.messages.context_processors.messages',
                ],
            },
        },
    ]
    

    project/setting.py 增加关于媒体文件和上传路径的配置

    MEDIA_ROOT = os.path.join(BASE_DIR, 'upload')
    MEDIA_URL = "/upload/"
    UPLOAD_URL = 'upload'
    STATICFILES_DIRS = (
        os.path.join(BASE_DIR, 'upload'),
    )
    
    

    2.6.2 执行模型迁移

    python manage.py makemigrations
    python manage.py migrate
    

    到此为止个人博客后端部分开发完成。这里面实际也包含了访客在博客网站上能够访问网站所需要的接口。

    3、实现效果展示

    3.1 访问API文档

    在浏览器中访问http://127.0.0.1:8000/swagger/,效果如下图:

    image-20210818083857068

    3.2 后端文件夹结构

    image-20210822221626409

    相关文章

      网友评论

          本文标题:Vue3+TypeScript+Django Rest Fram

          本文链接:https://www.haomeiwen.com/subject/fhxdiltx.html