美文网首页@IT·互联网技术研发汇集
python flask框架搭建一个个人博客

python flask框架搭建一个个人博客

作者: 星辰大海的精灵 | 来源:发表于2024-02-29 09:35 被阅读0次

    轻量级框架被广泛应用于中小型的网站开发,不同于javaweb,flask简单易懂适合快速上手,通过以下简单实例制作一个简易网站。

    开发前准备

    开发技术应用的技术:

    前端:jQuery+bootstrap+Ajax

    后端:flask+jinjia2+SQLAchemy+pymysql

    源码和数据库数据

    数据库数据是mysql,数据sql文件在文章最后喔!

    工具包通过pip安装即可

    pip install sqlalchemy

    如果需要安装对应版本的库则用pip install 库名==版本号

    pip install sqlalchemy==1.4.36

    如果出现pip命令报错的现象,跟换pip的下载源移步pip换源。建议均跟换下载源,国外的下载很慢。

    # 切换为清华源

    pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

    当然也可以不用pip下载,在pycharm中编写代码后通过IDE下载,如下

    点击install package pymysql直接安装。

    注意这个项目是好久之前的项目了,用的技术比较旧,建议去flask官网学习最新的技术再移植https://www.osgeo.cn/flask/

    本次使用的库及版本如下:

    项目的目录结构如下:

    启动后的页面如下:

    登录前只能看文章,登陆后有个人中心,积分,账户,发表文章等功能:

    页面有点丑哈,这是老早期的库存了。本来不打算更的,开了要不然就丢了,还是写一篇吧

    common目录

    # create_connect.py

    from sqlalchemy import MetaData

    #每个类都要继承flask-SQLAlchemy的db.Mdeol类,封装操作

    def extendsModel():

        from main import db

        FactorySession=db.session

        FactoryModel=db.Model

        md=MetaData(bind=db.engine)

        return (FactorySession,FactoryModel,md)

    # indentify.py

    import random,string

    class Figurecode:

        def randomcode(self):

            code=random.sample("abcdefghijklmnopqrstuvwxyz123456789",4)

            return ''.join(code)

    # index.py

    from os import abort

    from flask import Blueprint, render_template, jsonify, session

    from common.indentify import Figurecode

    from model.tt_article import Article

    from model.tt_user import User

    from model.tt_favorite import Favorite

    import math,re

    index=Blueprint("index",__name__)

    #自定义函数

    def function_cost(username,cost:int):

        user=User()

        user.buy_article_with_credit(username,cost)

        return index.jinja_env.globals.update(buy=function_cost(username,cost))

    """

    #实现jinja2前端渲染(Blueprint内置)将其注册到html中就不用再注册了没使用到注册

    @index.context_processor

    def jinja2_function():

        pass

    """

    #定义主页路由

    @index.route('/')        #定义路由模板页面

    def home():

        article=Article()

        result=article.find_with_nickname(0,5)

        total_page = math.ceil(article.find_view_artilce_count() / 5)

        #print(result1)

        new, max, random=article.get_three_result()

        return render_template('display.html',result=result,total=total_page,page=1,new=new,max=max,random=random)  #将数据传到前端

    #定义文章分页路由

    @index.route('/page/<int:page>')

    def pagelimit(page):

        start=(page-1)*5

        article=Article()

        result=article.find_with_nickname(start,5)

        total_page=math.ceil(article.find_view_artilce_count()/5)

        new, max, random = article.get_three_result()

        return render_template('display.html',result=result,page=page,total=total_page,new=new,max=max,random=random)  #page是当前页参数

    @index.route('/type/<int:type>-<int:page>')    #实现类别并分页

    def articletypelimit(type,page):

        article=Article()

        start=(page-1)*5

        result1=article.find_article_by_type(type,start,5)

        paing_count=math.ceil(article.get_diffrent_articelcount_by_type(type)/5)

        new, max, random = article.get_three_result()

        return render_template('article.html',result=result1,paing=paing_count,page=page,type=type,new=new,max=max,random=random)

    @index.route('/search/<int:page>-<keyword>')

    def searchheadline(page,keyword):

        keyword=keyword.strip()

        if len(keyword)==0 or len(keyword)>15 :

            abort(404)

        start=(page-1)*5

        article=Article()

        result1=article.serach_by_headline(keyword,start,5)

        total=math.ceil(article.getcount_by_headline(keyword)/5)

        new, max, random = article.get_three_result()

        return render_template('search.html',result=result1,paing_count=total,keyword=keyword,page=page,new=new,max=max,random=random)

    @index.route('/index4')

    def function4():

        return render_template('index4.html')

    @index.route('/index3')

    def function3():

        return render_template('write.html')

    @index.route('/article/<int:id>')  #阅读界面

    def function2(id):

        article=Article()

        result1=article.get_article_by_id(id)

        if result1 is None:

            abort(404)

        article.update_read_count(id)

        new, max, random = article.get_three_result()

        favorite=Favorite()

        art_collect=favorite.if_collect(id,session.get('userid'))

        return render_template('read.html',result=result1,new=new,max=max,random=random,collect=art_collect)

    """

    @index.route('/test')  #实现前端渲染的路由

    def function5():

        '''

        article=Article()

        new,max,random=article.get_three_result()

        list=[]

        list.append(dict(new))

        list.append(dict(max))

        list.append(dict(random))

        return  jsonify(list)    #特定的数据结构才能转化为了json

        '''

    """

    # useregister.py

    from flask import Blueprint, request, session, make_response, json, redirect, url_for, render_template

    from common.indentify import *

    from model.tt_favorite import Favorite

    from model.comment import *

    from model.tt_user import User

    user = Blueprint('user', __name__)

    @user.route('/code')

    def get_code():

        figurecode = Figurecode()

        code = figurecode.randomcode()

        response = make_response(code)

        session['code'] = code.lower()

        return response

    @user.route('/register', methods=['POST'])

    def register():

        user = User()

        username = request.form.get('username')

        password = request.form.get('password')

        code = request.form.get('code')

        if user.identify_user_if_same(username) is False:

            return "USERNAME_IS_EXIST"

        elif code == session.get('code'):

            return 'CODE_ERROR'

        else:

            result = user.recive_registic(username, password)

            session['username'] = username

            session['islogin'] = 'true'

            session['userid'] = result.userid

            session['nickname'] = result.nickname

            session['role'] = result.role

            return 'OK'

    @user.route('/login', methods=['POST'])

    def login():

        user = User()

        username = request.form.get('username').strip()

        password = request.form.get('password').strip()

        code = request.form.get('code')

        print(username, password, code)

        result = user.identify_user_is_true(username, password)

        # if code != session.get('code'):

        #    return 'CODE ERROR'

        print(result,len(result))

        if len(result) == 0:

            return 'USER UNEXIST'

        elif result[0].password != password:

            return 'PASSWORD ERROR'

        else:

            user.add_credit_once(username)

            session['islogin'] = 'true'

            session['userid'] = result[0].userid

            session['username'] = result[0].username

            session['nickname'] = result[0].nickname

            session['role'] = result[0].role

            return 'LOGIN'

    @user.route('/logout')

    def logout():

        session.clear()    # 清空session,页面跳转,这里重定位

        return redirect('/')#url_for('/')

        #return render_template('display.html')

    #定义文章消耗路由

    @user.route('/article_cost',methods=['POST'])

    def articlecost():

        user=User()

        username = request.form.get('username')

        cost = int(request.form.get('cost'))

        user.buy_article_with_credit(username,cost)

        return 'OK'

    #定义文章收藏

    @user.route('/article_favorite',methods=['POST'])

    def article_favorite():

        favorite=Favorite()

        artid=request.form.get('articleid')

        userid=request.form.get('userid')

        favorite.insert_favorite(artid,userid)

        return 'OK'

    #新增评论

    @user.route('/add_comments',methods=['POST'])

    def add_comments():

        comment=Comment()

        article_id=request.form.get('articleid')

        comments=request.form.get('content')

        if len(comments)<5 or len(comments)>200:

            return 'ERROR'

        else:

            comment.insert_commnet(article_id, comments)

            return 'OK'

    # comment.py

    from flask import session

    from sqlalchemy import Table

    from common.create_connect import extendsModel

    import time

    FactorySession,FactoryModel,md=extendsModel()

    class Comment(FactoryModel):

        __table__=Table("comment",md,autoload=True)

        #新增一条评论

        def insert_commnet(self,article_id,comments):

            now = time.strftime('%Y-%m-%d')

            comment=Comment(userid=session.get("userid"),articleid=article_id,content=comments,updatetime=now)

            FactorySession.add(comment)

            FactorySession.commit()

        #根据文章id查询所有评论

        def find_comments_by_articelid(self,articleid):

            result=FactorySession.query(Comment).filter(Comment.articleid==articleid,Comment.hide==0).all()

            return result

    """

    if __name__=='__main__':

        comment=Comment()

        print(comment.find_comments_by_articelid(7))

    """

    # tt_article.py

    from sqlalchemy import Table, func

    from model.tt_user import User

    from common.create_connect import extendsModel

    FactorySession,FactoryModel,md=extendsModel()

    class Article(FactoryModel):

        __table__=Table('tt_article',md,autoload=True)

        # 定义操作

        #指定分页limit限制和最新显示offset  select * from user limit param1  offset param2 ;  从(param2+1)条数据开始,取 param1条数据

        def find_with_nickname(self,start,count):

            result=FactorySession.query(Article,User.nickname).join(User,User.userid==Article.userid)\

                .filter(Article.hidden==0,Article.drafted==1,Article.checked==1)\

                .order_by(Article.articleid.asc()).limit(count).offset(start).all()

            return result  #将数据库表映射到对象

        #获取文章数量

        def find_view_artilce_count(self):

            count=FactorySession.query(Article).filter(Article.hidden==0,Article.drafted==1,Article.checked==1).count()

            return count

        #根据文章类型获取文章

        def find_article_by_type(self,type,start,count):

            result=FactorySession.query(Article,User.nickname).filter(Article.hidden==0,Article.drafted==1,Article.checked==1,Article.type==type)\

                .order_by(Article.articleid.asc()).limit(count).offset(start).all()      #按文章类型将所有文章查出来

            return result

        #根据文章类型获取不同文章类型的数量

        def get_diffrent_articelcount_by_type(self,type):

            articel_count = FactorySession.query(Article).filter(Article.hidden == 0, Article.drafted == 1,

                                                        Article.checked == 1,Article.type==type).count()

            return articel_count

        #根据文章标题搜索

        def serach_by_headline(self,headline,start,count):

            result = FactorySession.query(Article, User.nickname).filter(Article.hidden == 0, Article.drafted == 1,

                                                                        Article.checked == 1, Article.headline.like('%'+headline+'%') ) \

                .order_by(Article.articleid.asc()).limit(count).offset(start).all()  # 按文章类型将所有文章查出来

            return result

        #统计搜索标题的总数量

        def getcount_by_headline(self,headline):

            articel_count = FactorySession.query(Article).filter(Article.hidden == 0, Article.drafted == 1,

                                                                Article.checked == 1, Article.headline.like('%'+headline+'%')).count()

            return articel_count

        #最新文章推荐

        def get_new_article(self):  #倒叙输出文章

            result=FactorySession.query(Article.articleid,Article.headline)\

                .filter(Article.hidden==0,Article.drafted==1,Article.checked==1,Article.recommended==1)\

                .order_by(Article.articleid.desc()).limit(9).all()

            return result

        #最多阅读文章

        def get_max_readed_articel(self):

            result = FactorySession.query(Article.articleid, Article.headline) \

                .filter(Article.hidden == 0, Article.drafted == 1, Article.checked == 1, Article.recommended == 1) \

                .order_by(Article.readcount.desc()).limit(9).all()  # 倒叙输出文章

            return result

        #特别推荐从推荐随机生成文章

        def get_random_article(self):

            result = FactorySession.query(Article.articleid, Article.headline) \

                .filter(Article.hidden == 0, Article.drafted == 1, Article.checked == 1, Article.recommended == 1) \

                .order_by(func.rand()).limit(9).all()  # 倒叙输出文章

            return result

        #将上述三个结果一次返回

        def get_three_result(self):

            new= self.get_new_article()

            max=self.get_max_readed_articel()

            random=self.get_random_article()

            return new,max,random

        #根据文章id搜索文章

        def get_article_by_id(self,id):

            result = FactorySession.query(Article).filter(Article.hidden == 0, Article.drafted == 1,

                                                                Article.checked == 1, Article.articleid == id).all()

            return result

        #每阅读一次文章阅读量加一

        def update_read_count(self,articleid):

            result = FactorySession.query(Article).filter(Article.articleid==articleid).first()

            result.readcount += 1

            FactorySession.commit()

        '''

        def test_function(self):

            result = FactorySession.query(Article.headline).filter(Article.userid == 3).first()

            return result

        '''

    """测试函数

    if __name__=='__main__':

        article = Article()

        #result1 = article.get_diffrent_articelcount_by_type(4)

        #result1=article.test_function()

        #result1=article.get_article_by_id(50)

        #result1=article.get_new_article()

        #print(result1)

        #result=article.update_read_count(1)

        print(article.get_random_article()[0])

    """

    # tt_favorite.py

    #引入flask-SQLAlchemy的封装类

    from flask import session

    from sqlalchemy import Table

    from common.create_connect import extendsModel

    #对flask-SQLAlchemy的封装类实例化

    FactorySession,FactoryModel,md=extendsModel()

    class Favorite(FactoryModel):

        __table__=Table('tt_favorite',md,autoload=True)

        #插入收藏id

        def insert_favorite(self,articleid,userid):

            result=FactorySession.query(Favorite).filter(Favorite.articleid==articleid,Favorite.userid==userid).all()

            if len(result) !=0 :

                result[0].canceled = 1

            else:

                favorite=Favorite(articleid=articleid,userid=userid,canceled=0)

                FactorySession.add(favorite)

            FactorySession.commit()

        #判断文章是否收藏

        def if_collect(self,articleid,userid):

            result=FactorySession.query(Favorite).filter(Favorite.articleid==articleid,Favorite.userid==userid).all()

            if len(result)==0:

                return "收藏"

            else:

                if result[0].canceled ==1:

                    return "收藏"

                else:

                    return "已收藏"

    '''

    if __name__=='__main__':

        f=Favorite()

        #f.insert_favorite(20,2)

        print(f.if_collect(20,51))

    '''

    #引入flask-SQLAlchemy的封装类

    import time

    from sqlalchemy import Table

    from common.create_connect import extendsModel

    #对flask-SQLAlchemy的封装类实例化

    FactorySession,FactoryModel,md=extendsModel()

    class User(FactoryModel):

        __table__=Table('tt_user',md,autoload=True)

        def identify_user_if_same(self,username):

            result=FactorySession.query(User.username).filter(User.username==username).all()

            if len(result) ==0:

                return True

            else:

                return False

        def identify_user_is_true(self,username,password):

            result=FactorySession.query(User).filter(User.username==username,User.password==password).all()

            return result

        #注册

        def recive_registic(self,username,password):

            now=time.strftime('%Y-%m-%d')

            user=User(username=username,password=password,role='user',credit=50,nickname="用户",avatar='one.png',createtime=now)

            FactorySession.add(user)

            FactorySession.commit()

            return user

        #每登录一次积分加一

        def add_credit_once(self,username):

            user=FactorySession.query(User).filter(User.username==username).first()

            user.credit += 1

            FactorySession.commit()

        def buy_article_with_credit(self,username,cost:int):

            user=FactorySession.query(User).filter(User.username==username).first()

            user.credit -= cost

            FactorySession.commit()

    '''

    if __name__=='__main__':

        user=User()

        #user.recive_registic("suyanzeng",123456)

        #print(user.identify_user_if_same('zhanglong'))

        #print(user.identify_user_is_true('zhangsan',123456))

        #print(user.add_credit_once('xuwenhui'))

        #print(user.buy_article_with_credit('xuwenhui',10))

        user.buy_article_with_credit('xuwenhui',10)

    '''

    启动类main.py

    #coding=utf-8

    from flask import Flask, render_template, request, current_app

    import os

    from flask_sqlalchemy import SQLAlchemy

    import pymysql

    #from gevent import pywsgi

    pymysql.install_as_MySQLdb()

    app = Flask(__name__, template_folder='template', static_folder='resource', static_url_path='/')

    app.config['SECRET_KEY']=os.urandom(24)  #sesession随机种子

    #集成的flask-sqlalchemy处理数据库

    app.config['SQLALCHEMY_DATABASE_URI']='mysql://root:Xwh190823@139.9.209.93:3306/woliubiji'

    app.config['SQLALCHEMY_TRACK_MODIFICATIONS']=False

    #实例化sqlalchemy对象

    db=SQLAlchemy(app)

    #拦截器

    @app.errorhandler(404)

    def page_error(e):

        return render_template('page_no_found.html')

    @app.errorhandler(505)

    def server_error(e):

        return render_template('server_is_busy.html')

    @app.errorhandler(401)

    def page_error(e):

        return 'page_no_found.html',401

    #定义文章类别函数,没有用bluepirnt要注册jinja2是blueprint内置的

    @app.context_processor  #上下文处理器,对html中的一个字符修改相当于自定义一个函数共jinja2使用

    def type_transform():

        typeExample={

            1:"科幻片",

            2:"战争片",

            3:"武侠片",

            4:"电视剧",

            5:"魔幻片"

        }

        return dict(article_type=typeExample)

    """

    @app.jinja_env.filters.update  #jinja2自定义过滤器对文字的长度限制

    def myTruncate(s,length,end=...):

        pass

    """

    if __name__ == '__main__':

        from controller.index import *

        app.register_blueprint(index)

        from controller.userregistic import *

        app.register_blueprint(user)

        # 开启debug模式

    #  app.run(debug=True)

        app.run(host='127.0.0.1',port=5000)

      #  server = pywsgi.WSGIServer(('0.0.0.0', 5000), app)

      #  server.serve_forever()

    静态资源

    数据库sql文件

    源码和数据库数据

    服务器运行

    该项目也是可以直接在服务器运行的,如下用法SSH工具上传到服务器

    进入到主目录

    上传到服务器需要将数据库地址改为本地,数据库本地地址对外网来说就是远程地址

    然后输入python3 main.py启动项目

    启动后直接输入ip+端口号访问

    需要注意的是,退出SSH工具后就服务就停止了,使用nohup命令启动持久化即可Linux nohup 命令。

    相关文章

      网友评论

        本文标题:python flask框架搭建一个个人博客

        本文链接:https://www.haomeiwen.com/subject/gxhfzdtx.html