继上次爬虫的文章说明后,现开始添加web方向,从flask框架开始搭建
开发环境:
操作系统: Ubuntu 16+
数据库: MySQL + Redis
python解释器: 3.0+
编辑器: PyCharm
本项目为前后端不分离,但前后端分离的项目仍然可以参考,因为只是返回值形式不同
目录搭建:
项目总目录.png- 管理文件:manage.py
该文件下用于实例化程序实例、管理器对象、数据库迁移等
from info import create_app, db, models
app = create_app('development')
#导入flask_script与flask_migrate扩展包实现数据库迁移(迁移时使用终端)
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
#实例化管理器对象
manage = Manager(app)
#使用迁移框架
Migrate(app, db)
#添加迁移命令
manage.add_command('db', MigrateCommand)
if __name__ == '__main__':
print(app.url_map)
# app.run()
manage.run() # 替代app.run()在终端运行,若在pycharm中运行,需要添加脚本参数runserver
- 配置文件:config.py
该文件下存放一些项目的配置信息,如数据库的配置和运行模式配置
# 导入redis模块实现redis数据库的连接
from redis import StrictRedis
class Config():
# 配置密钥
SECRET_KEY = 'LzqsHtQCz6MU/pyfuu61x1gE2qOC5qDwPWz3ewlS/I6S68QxtVQjsw=='
# 配置状态保持session,将其保存在Redis数据库中
SESSION_TYPE = 'redis' # 使用redis数据库
# 定义redis数据库的主机和端口
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
SESSION_REDIS = StrictRedis(host=REDIS_HOST, port=REDIS_PORT) # 连接redis数据库
SESSION_USE_SIGNER = True # 设置签名
PERMANENT_SESSION_LIFETIME = 86400 # 设置有效期
# 配置mysql数据库连接(以本地连接为例)
SQLALCHEMY_DATABASE_URI = 'mysql://root:mysql@localhost/数据库名称' # 应先创建mysql数据库
SQLALCHEMY_TRACK_MODIFICATIONS = False
# 开发模式
class DevelopmentConfig(Config):
DEBUG = True
# 生产模式
class ProductionConfig(Config):
DEBUG = False
config_dict = {
'development': DevelopmentConfig,
'production': ProductionConfig
}
- .gitignore文件里写明一些在项目中使用Git提交时需要忽略的文件/ 夹,如:
.idea/
*.py[cop]
__pycache__/
logs/
migrations/
-
migrations文件夹是数据库迁移时自动生成的,而不是New出来的,在进行数据库迁移之前,MySQL中要先有数据库,并且项目中要先创建好models.py文件,因为该文件中的各个模型类映射各个数据表
-
logs文件夹需要New,其下存放着项目运行中生成的日志文件
-
info目录是整个项目的模块目录,所有模块都建在其中
info文件夹.png- models.py
该文件为创建数据表的模型类,数据表的关系,返回的一些字段都在其中定义,这也是项目的核心,对于一般项目的表关系,每个后端人员都应当会创建
- models.py
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from info import constants
from . import db
class BaseModel(object):
"""模型基类,为每个模型补充创建时间与更新时间"""
create_time = db.Column(db.DateTime, default=datetime.now) # 记录的创建时间
update_time = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now) # 记录的更新时间
# 用户收藏表,建立用户与其收藏新闻多对多的关系
tb_user_collection = db.Table(
"info_user_collection",
db.Column("user_id", db.Integer, db.ForeignKey("info_user.id"), primary_key=True), # 新闻编号
db.Column("news_id", db.Integer, db.ForeignKey("info_news.id"), primary_key=True), # 分类编号
db.Column("create_time", db.DateTime, default=datetime.now) # 收藏创建时间
)
tb_user_follows = db.Table(
"info_user_fans",
db.Column('follower_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True), # 粉丝id
db.Column('followed_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True) # 被关注人的id
)
class User(BaseModel, db.Model):
"""用户"""
__tablename__ = "info_user"
id = db.Column(db.Integer, primary_key=True) # 用户编号
nick_name = db.Column(db.String(32), unique=True, nullable=False) # 用户昵称
password_hash = db.Column(db.String(128), nullable=False) # 加密的密码
mobile = db.Column(db.String(11), unique=True, nullable=False) # 手机号
avatar_url = db.Column(db.String(256)) # 用户头像路径
last_login = db.Column(db.DateTime, default=datetime.now) # 最后一次登录时间
is_admin = db.Column(db.Boolean, default=False)
signature = db.Column(db.String(512)) # 用户签名
gender = db.Column(
db.Enum(
"MAN", # 男
"WOMAN" # 女
),
default="MAN"
)
# 当前用户收藏的所有新闻
collection_news = db.relationship("News", secondary=tb_user_collection, lazy="dynamic") # 用户收藏的新闻
# 用户所有的粉丝,添加了反向引用followed,代表用户都关注了哪些人
followers = db.relationship('User',
secondary=tb_user_follows,
primaryjoin=id == tb_user_follows.c.followed_id,
secondaryjoin=id == tb_user_follows.c.follower_id,
backref=db.backref('followed', lazy='dynamic'),
lazy='dynamic')
# 当前用户所发布的新闻
news_list = db.relationship('News', backref='user', lazy='dynamic')
@property
def password(self):
raise AttributeError("当前属性不可读")
@password.setter
def password(self, value):
self.password_hash = generate_password_hash(value)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
resp_dict = {
"id": self.id,
"nick_name": self.nick_name,
"avatar_url": constants.QINIU_DOMIN_PREFIX + self.avatar_url if self.avatar_url else "",
"mobile": self.mobile,
"gender": self.gender if self.gender else "MAN",
"signature": self.signature if self.signature else "",
"followers_count": self.followers.count(),
"news_count": self.news_list.count()
}
return resp_dict
def to_admin_dict(self):
resp_dict = {
"id": self.id,
"nick_name": self.nick_name,
"mobile": self.mobile,
"register": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"last_login": self.last_login.strftime("%Y-%m-%d %H:%M:%S"),
}
return resp_dict
class News(BaseModel, db.Model):
"""新闻"""
__tablename__ = "info_news"
id = db.Column(db.Integer, primary_key=True) # 新闻编号
title = db.Column(db.String(256), nullable=False) # 新闻标题
source = db.Column(db.String(64), nullable=False) # 新闻来源
digest = db.Column(db.String(512), nullable=False) # 新闻摘要
content = db.Column(db.Text, nullable=False) # 新闻内容
clicks = db.Column(db.Integer, default=0) # 浏览量
index_image_url = db.Column(db.String(256)) # 新闻列表图片路径
category_id = db.Column(db.Integer, db.ForeignKey("info_category.id"))
user_id = db.Column(db.Integer, db.ForeignKey("info_user.id")) # 当前新闻的作者id
status = db.Column(db.Integer, default=0) # 当前新闻状态 如果为0代表审核通过,1代表审核中,-1代表审核不通过
reason = db.Column(db.String(256)) # 未通过原因,status = -1 的时候使用
# 当前新闻的所有评论
comments = db.relationship("Comment", lazy="dynamic")
def to_review_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"status": self.status,
"reason": self.reason if self.reason else ""
}
return resp_dict
def to_basic_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"source": self.source,
"digest": self.digest,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"index_image_url": self.index_image_url,
"clicks": self.clicks,
}
return resp_dict
def to_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"source": self.source,
"digest": self.digest,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"content": self.content,
"comments_count": self.comments.count(),
"clicks": self.clicks,
"category": self.category.to_dict(),
"index_image_url": self.index_image_url,
"author": self.user.to_dict() if self.user else None
}
return resp_dict
class Comment(BaseModel, db.Model):
"""评论"""
__tablename__ = "info_comment"
id = db.Column(db.Integer, primary_key=True) # 评论编号
user_id = db.Column(db.Integer, db.ForeignKey("info_user.id"), nullable=False) # 用户id
news_id = db.Column(db.Integer, db.ForeignKey("info_news.id"), nullable=False) # 新闻id
content = db.Column(db.Text, nullable=False) # 评论内容
parent_id = db.Column(db.Integer, db.ForeignKey("info_comment.id")) # 父评论id
parent = db.relationship("Comment", remote_side=[id]) # 自关联
like_count = db.Column(db.Integer, default=0) # 点赞条数
def to_dict(self):
resp_dict = {
"id": self.id,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"content": self.content,
"parent": self.parent.to_dict() if self.parent else None,
"user": User.query.get(self.user_id).to_dict(),
"news_id": self.news_id,
"like_count": self.like_count
}
return resp_dict
class CommentLike(BaseModel, db.Model):
"""评论点赞"""
__tablename__ = "info_comment_like"
comment_id = db.Column("comment_id", db.Integer, db.ForeignKey("info_comment.id"), primary_key=True) # 评论编号
user_id = db.Column("user_id", db.Integer, db.ForeignKey("info_user.id"), primary_key=True) # 用户编号
class Category(BaseModel, db.Model):
"""新闻分类"""
__tablename__ = "info_category"
id = db.Column(db.Integer, primary_key=True) # 分类编号
name = db.Column(db.String(64), nullable=False) # 分类名
news_list = db.relationship('News', backref='category', lazy='dynamic')
def to_dict(self):
resp_dict = {
"id": self.id,
"name": self.name
}
return resp_dict
-
constants.py文件为常量文件
-
init.py文件
项目模块的初始化文件,如数据库的实例化对象,添加项目日志工具,返回项目实例化对象的工厂函数,该函数中会将需要与项目对象进行关联的类进行实例化,如:蓝图的注册。csrf保护也在该函数中进行
# 各种初始化文件放入info包的init文件中
from flask import Flask
from config import config_dict, Config # 导入配置信息
from flask_session import Session # 导入flask_session扩展包,实现状态保持
from flask_sqlalchemy import SQLAlchemy
# 导入redis连接对象
from redis import StrictRedis
# 导入标准日志模块和日志处理模块
import logging
from logging.handlers import RotatingFileHandler
from flask_wtf import CSRFProtect, csrf
# 理解成实例化MySQL
db = SQLAlchemy()
# 实例化redis对象,用来保存和业务相关的数据,比如图片验证码,短信验证码等
redis_store = StrictRedis(host=Config.REDIS_HOST, port=Config.REDIS_PORT,
decode_responses=True)
# 设置日志的记录等级
logging.basicConfig(level=logging.DEBUG) # 调试debug级
# 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
file_log_handler = RotatingFileHandler("logs/log", maxBytes=1024*1024*100, backupCount=10)
# 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
formatter = logging.Formatter('%(levelname)s %(filename)s:%(lineno)d %(message)s')
# 为刚创建的日志记录器设置日志记录格式
file_log_handler.setFormatter(formatter)
# 为全局的日志工具对象(flask app使用的)添加日志记录器
logging.getLogger().addHandler(file_log_handler)
# 设置工厂函数,实例化一些类,并返回app对象
# 可以给函数传入参数,决定程序以什么模式运行
def create_app(config_name):
app = Flask(__name__)
# 添加配置信息
app.config.from_object(config_dict[config_name])
# 注意这些实例化的代码需要配置密钥,所以先在上方设置密钥
Session(app) # 实例化Session
db.init_app(app)
# 为项目开启csrf保护
CSRFProtect(app)
# 生成csrf_token令牌,给每个请求的客户端
@app.after_request
def after_request(response):
csrf_token = csrf.generate_csrf() # 请求带来的token会在此校验
response.set_cookie('csrf_token', csrf_token)
return response
# 导入蓝图并注册:
# 新闻模块的蓝图
from info.modules.news import news_blue
app.register_blueprint(news_blue)
# 验证模块的蓝图
from info.modules.passport import passport_blue
app.register_blueprint(passport_blue)
# 导入自定义过滤器
from info.utils.commons import index_click_filter
app.add_template_filter(index_click_filter, 'index_click_filter')
return app
-
utils文件夹为一些公用的或第三方工具,如一些装饰器,状态码的定义都可在其中
-
templates为模板文件夹
-
static为前端文件夹,用于存放前端文件或一些静态文件
-
libs文件夹为第三方的SDK及其相关文件的存放处
-
modules文件夹
其中即为项目的各个模块
用于存放项目各个模块的文件夹.png
1、在各个模块的init.py文件中:
创建蓝图对象,并将使用该蓝图对象的文件导入到创建蓝图对象的下方:
from flask import Blueprint
# 创建蓝图对象
passport_blue = Blueprint('passport_blue', __name__)
# 导入引用了该蓝图对象的文件
from . import views
2、在各个模块的views.py文件中:
首先导入该模块的蓝图对象,用以视图函数的路由映射
其他的就是写接口,具体的开发代码在各个视图函数中实现即可:
# 导入蓝图对象
from . import passport_blue
from flask import request, jsonify, current_app, make_response, abort, session
# 导入自定义的状态码
from info.utils.response_code import RET
# 导入生成图片验证码的库
from info.utils.captcha.captcha import captcha
# 导入redis数据库实例
from info import redis_store, constants, db
import random, re
# 导入模型类
from info.models import User
# 导入运通迅发送短信验证码的接口
from info.libs.yuntongxun.sms import CCP
from datetime import datetime
'''生成图片验证码接口'''
@passport_blue.route('/image_code')
def generate_image_code():
# 获取前端传来的参数uuid
image_code_id = request.args.get('image_code_id')
# 判断参数是否存在
if not image_code_id:
# return abort(403) 对于非ajax请求,返回的内容应该在哪查看?
return jsonify(errno=RET.PARAMERR, errmsg='参数缺失')
# 调用生成验证码图片的函数
name, text, image = captcha.generate_captcha()
# 把图片验证码的text存入redis数据库
try:
redis_store.setex('ImageCode_' + image_code_id, constants.IMAGE_CODE_REDIS_EXPIRES, text)
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='数据保存失败')
response = make_response(image)
# 修改浏览器响应的数据类型,以便测试工具也可以看到结果
response.headers['Content-Type'] = 'image/jpg'
return response
'''获取短信验证码接口'''
@passport_blue.route('/sms_code', methods=['post'])
def get_sms_code():
# 获取手机号,图片验证码及其image_code_id
mobile = request.json.get('mobile')
image_code = request.json.get('image_code')
image_code_id = request.json.get('image_code_id')
# 验证参数完整性
if not all([mobile, image_code, image_code_id]):
return jsonify(errno=RET.PARAMERR, errmsg='参数缺失')
# 验证手机号
if not re.match(r'1[3456789]\d{9}$', mobile):
return jsonify(errno=RET.PARAMERR, errmsg='可能不是一个手机号')
# 验证图片验证码
try:
redis_image_code = redis_store.get('ImageCode_' + image_code_id)
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='redis获取图片验证码失败')
if not redis_image_code:
return jsonify(errno=RET.DBERR, errmsg='图片验证码已过期')
# 删除redis图片验证码
try:
redis_store.delete('ImageCode_' + image_code_id)
except Exception as e:
current_app.logger.error(e)
# 比较从表单获取的图片验证码与redis图片验证码是否一致
if image_code.lower() != redis_image_code.lower():
return jsonify(errno=RET.DATAERR, errmsg='图片验证码填写错误'
# 判断是否曾注册过
try:
user = User.query.filter_by(mobile=mobile).first()
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='查询用户数据失败')
if user:
return jsonify(errno=RET.DATAEXIST, errmsg='用户已存在')
# 调用短信验证码第三方接口
num_code = '%06d' % random.randint(0, 999999)
try:
redis_store.setex('SMSCode_' + mobile, constants.SMS_CODE_REDIS_EXPIRES, num_code)
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='保存短信验证码失败')
# 发送短信验证码.
try:
ccp = CCP()
# 如果1参是字符串,2参0号元素是验证码,1号元素是有效期(分钟);如果一参是int,则反过来
result = ccp.send_template_sms(mobile, [num_code, constants.SMS_CODE_REDIS_EXPIRES / 60], 1)
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.THIRDERR, errmsg='短信验证码发送异常')
if result == 0:
return jsonify(errno=RET.OK, errmsg='发送成功')
else:
return jsonify(errno=RET.THIRDERR, errmsg='短信验证码发送失败')
'''用户注册接口'''
@passport_blue.route('/user_signup', methods=['post'])
def user_signup():
# 接收手机号,验证码,密码
mobile = request.json.get('mobile')
num_code = request.json.get('smscode')
password = request.json.get('password')
# 检查参数完整性
if not all([mobile, num_code, password]):
return jsonify(errno=RET.PARAMERR, errmsg='参数缺失')
# 检查手机号格式
if not re.match('1[3456789]\d{9}$', mobile):
return jsonify(errno=RET.PARAMERR, errmsg='手机号格式错误')
# 尝试从redis中获取短信验证码
try:
redis_num_code = redis_store.get('SMSCode_' + mobile)
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='查询数据失败')
# 判断查询结果
if not redis_num_code:
return jsonify(errno=RET.NODATA, errmsg='验证码已过期')
# 比较验证码是否正确
if redis_num_code != str(num_code):
return jsonify(errno=RET.DATAERR, errmsg='短信验证码错误')
# 删除redis中的验证码
try:
redis_store.get('SMSCode_' + mobile)
except Exception as e:
current_app.logger.error(e)
# 判断是否曾注册过
try:
user = User.query.filter_by(mobile=mobile).first()
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='查询用户数据失败')
if user is not None:
return jsonify(errno=RET.DATAEXIST, errmsg='用户已存在')
# 构造模型类对象,保存用户数据
user = User()
user.mobile = mobile
user.nick_name = mobile
user.password = password
# 提交到mysql
try:
db.session.add(user)
db.session.commit()
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='保存数据失败')
# 缓存用户信息,实现状态保持
session['user_id'] = user.id
session['mobile'] = mobile
session['nick_name'] = user.mobile
return jsonify(errno=RET.OK, errmsg='注册成功')
'''用户登录接口'''
@passport_blue.route('/user_login', methods=['post'])
def login():
# 获取手机号和密码
mobile = request.json.get('mobile')
password = request.json.get('password')
if not all([mobile, password]):
return jsonify(errno=RET.DATAERR, errmsg='数据缺失')
if not re.match(r'1[3456789]\d{9}$', mobile):
return jsonify(errno=RET.PARAMERR, errmsg='可能不是一个手机号')
try:
user = User.query.filter_by(mobile=mobile).first()
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.NODATA, errmsg='用户未注册')
if not user or not user.check_password(password):
return jsonify(errn0=RET.NODATA, errmsg='用户名或密码错误')
user.last_login = datetime.now()
# 将数据提交到数据库
try:
db.session.add(user)
db.session.commit()
except Exception as e:
current_app.logger.error(e)
return jsonify(errno=RET.DBERR, errmsg='保存数据失败')
# 缓存用户信息,实现状态保持
session['user_id'] = user.id
session['mobile'] = user.mobile
session['nick_name'] = user.nick_name
return jsonify(errno=RET.OK, errmsg='登录成功')
'''用户退出登录接口'''
@passport_blue.route('/user_logout')
def logout():
'''
清除当前用户在浏览器的信息
若前后端分离,应使用delete方法
:return:
'''
session.pop('user_id', None)
session.pop('mobile', None)
session.pop('nick_name', None)
return jsonify(errno=RET.OK, errmsg='退出成功')
结束。
网友评论