在本章我们实现类似于 Twitter 和其他社交网络的“粉丝”功能。应用的用户能够关注其他用户也能够知道谁在关注我。在本章我们要扩展数据库模型来实现这些功能。
深入理解数据库关系
我们的需求是为每个用户维护一个“粉丝”用户列表(followers)和“已关注”用户列表(followed)。不幸的是,关系型数据库没有列表类型的字段来保存它们,那么只能通过表的现有字段和他们之间的关系来实现。
数据库已有一个代表用户的表,所以剩下的就是如何组织他们之间的关注与被关注的关系。 这正是回顾基本数据库关系类型的好时机:
一对多
我已经在前面章节用过了一对多关系。这是该关系的示意图:
用户和用户动态是一对多关系。其中,一个用户拥有多条用户动态,而一条用户动态只属于一个用户(作者)。
数据库在多的这方使用了一个外键以表示一对多关系。在上面的一对多关系中,外键是 post
表的 user_id
字段,这个字段将用户的每条动态都与其作者关联了起来。
反过来通过 post
表中的 user_id
字段,也可以查询到指定用户的所有用户动态的列表,返回所有 user_id
字段等于某指定用户 id
的用户动态即可。
多对多
多对多关系会更加复杂,举个例子,数据库中有 students
表和 teachers
表,一名学生学习多位老师的课程,一位老师教授多名学生。这就像两个重叠的一对多关系。
对于这种类型的关系,我想要能够查询数据库来获取教授给定学生的教师的列表,以及某个教师课程中的学生的列表。 想要在关系型数据库中梳理这样的关系并非轻易而举,因为无法通过向现有表添加外键来完成此操作。
展现多对多关系需要使用额外的关联表。以下是数据库如何查找学生和教师的示例:
多对一和一对一
多对一关系类似于一对多关系。 不同的是,这种关系是从“多”的角度来看的。
一对一的关系是一对多的特例。 实现是相似的,但是一个约束被添加到数据库,以防止“多”一方有多个链接。
实现粉丝机制
查看所有关系类型的概要,很容易确定维护粉丝关系的正确数据模型是多对多关系,因为用户可以关注多个其他用户,并且用户可以拥有多个粉丝。不过,在学生和老师的例子中,多对多关系关联了两个实体。 但在粉丝关系中,用户关注其他用户,只有一个用户实体。
那么,多对多关系的第二个实体是什么呢?该关系的第二个实体也是用户。 一个类的实例被关联到同一个类的其他实例的关系被称为自引用关系,这正是我在这里所用到的。
使用自引用多对多关系来实现粉丝机制的表结构示意图:
followers
表是关系的关联表。 此表中的外键都指向用户表中的数据行,因为它将用户关联到用户。 该表中的每个记录代表关注者和被关注者的一个关系。
数据库模型的实现
首先,让我们在数据库模型中添加 followers
关联表:
# app\models.py
followers = db.Table(
'followers',
db.Column('follower_id', db.Integer, db.ForeignKey('user.id')),
db.Column('followed_id', db.Integer, db.ForeignKey('user.id'))
)
关联表并没有像用户和用户动态那样,将表声明为模型。 因为这是一个除了外键没有其他数据的辅助表,所以我创建它的时候没有关联到模型类。
在用户表中声明多对多的关系:
# app\models.py
class User(db.Model, UserMixin):
# ...
followed = db.relationship(
'User',
secondary=followers,
primaryjoin=(followers.c.follower_id == id),
secondaryjoin=(followers.c.followed_id == id),
backref=db.backref('followers', lazy='dynamic'),
lazy='dynamic'
)
这里使用 db.relationship
方法来定义模型类中的关系。这种关系将 User
实例关联到其他 User
实例。按照惯例,左侧用户关注右侧用户。所以在左侧的用户中定义了 followed
的关系,当我从左侧查询这个关系时,我将得到已关注的用户列表(即右侧的列表)。
让我们逐个检查这个 db.relationship()
所有的参数:
-
'User'
是关系当中的右侧实体(将左侧实体看成是上级类)。由于这是自引用关系,所以在两侧都使用同一个实体。 -
secondary
指定了用于该关系的关联表,就是使用我在上面定义的followers
。 -
primaryjoin
指明了右侧实体(被关注着)通过关系表关联到左侧实体(已关注者)的条件 。举个例子,要得到ID
为 1 的用户的已关注者,就需要在关系表中查找follower_id
为 1 的记录,所以设定的关联条件是follower_id
字段与当前用户的ID
匹配。该关联条件用followers.c.follower_id == id
表达式表示。 -
secondaryjoin
指明了左侧实体(已关注者)通过关系表关联到右侧实体(被关注者)的条件 。 这个条件与primaryjoin
类似。举个例子,要得到ID
为 1 的用户的被关注者,就需要在关系表中查找followed_id
为 1 的记录,所以设定的关联条件是 followed_id 字段与当前用户的 ID 匹配。该关联条件用followers.c.followed_id == id
表达式表示。 -
backref
定义了右侧实体如何访问该关系。在左侧,关系被命名为followed
,所以在右侧我将使用followers
来表示所有左侧用户的列表,即粉丝列表。附加的lazy
参数表示这个查询的执行模式,设置为动态模式的查询不会立即执行,直到被调用。 -
lazy
和backref
中的lazy
类似,只不过当前的这个是应用于左侧实体,backref
中的是应用于右侧实体。
我们不妨直接看看数据库 followers
关系表中的情况,假设该表有如下数据:
follower_id | followed_id |
---|---|
1 | 2 |
1 | 3 |
2 | 1 |
2 | 3 |
从左侧关联右侧看,第一行表示 ID 为 1 的用户关注了(followed)ID 为 2 的用户,从第二行我们也能得知该用户也关注了 ID 为 3 的用户。使用 followed
关键字来进行这种查询:
>>> u1 = User.query.get(1)
>>> u1.followed .all()
[<User tom>, <User james>]
从右侧关联左侧看,第一行表示 ID 为 2 的用户的关注者(folloer)为 ID 为 1 的用户,使用 followers
关键字进行这种查询:
>>> u2 = User.query.get(2)
>>> u2.followers.all()
[<User diego>]
数据库的变更后,现在需要记执行数据库迁移并升级:
(venv) $ flask db migrate -m "followers"
[2021-07-23 13:51:13,255] INFO in __init__: Microblog startup!
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.autogenerate.compare] Detected added table 'followers'
Generating ...\migrations\versions\5f1d8e95fc0b_followers.py ... done
(venv) $ flask db upgrade
[2021-07-23 13:51:39,649] INFO in __init__: Microblog startup!
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade cf1611920c49 -> 5f1d8e95fc0b, followers
关注和取消关注
因为 SQLAlchemy ORM 的使用,一个用户关注另一个用户的行为可以通过 followed
关系抽象成一个列表来简便使用。 例如,如果我有两个用户存储在 user1
和 user2
变量中。
我们这样来实现 user1
关注 user2
:
user1.followed.append(user2)
要 user1
取消关注 user2
,可以这么做:
user1.followed.remove(user2)
当然更适当的方法是在用户模型添加新的方法来封装好 follow
和 unfollow
方法,以提高代码的可重用性。
# app\models.py
class User(db.Model, UserMixin):
# ...
def follow(self, user):
if not self.is_following(user):
self.followed.append(user)
def unfollow(self, user):
if self.is_following(user):
self.followed.remove(user)
def is_following(self, user):
return self.followed.filter(
followers.c.followed_id == user.id).count() > 0
在处理关系之前,我们使用一个 is_following()
方法来确认操作的前提条件是否符合,如果两个用户本身已存在关注/被关注关系,就没必要重复操作了。 相同的逻辑可以应用于取消关注。
is_following()
方法用来查询来检查当前用户是否已关注某指定用户。在之前我们使用 SQLAlchemy
查询对象的 filter_by()
方法,例如,查找指定用户名的用户。在这里我们使用 filter()
方法它更加偏向底层,因为它可以包含任意的过滤条件,而不像 filter_by()
,它只能检查是否等于一个常量值。
如果要检查 A 用户是否关注了 B 用户,要检查 A 的 followed
查询集中 followed_id
是否有等于 B 用户的 id
的。查询以 count()
结束,如果 >0
则表示 A 用户关注了 B 用户。
查看已关注用户的动态
下面我们来实现用户查看已关注用户动态的功能,这需要在众多条用户动态在找出当前用户所关注的用户所发布的动态。这需要用数据库查询来返回这些用户动态。
最显而易见的方案是先执行一个查询以返回已关注用户的列表,用 user.followed.all()
语句即可。然后用一个循环,对每个已关注的用户执行一个查询来返回他们的用户动态,最后将所有用户的动态按照日期时间倒序合并到一个列表中。听起来不错?其实不然。
这种方法有几个问题。 如果一个用户关注了一千人,会发生什么? 我需要执行一千个数据库查询来收集所有的用户动态。 然后我需要合并和排序内存中的一千个列表。
第二个问题,考虑到应用主页最终将实现分页,所以它不会显示所有可用的用户动态,只能是前几个, 如果我要按它们的日期排序来显示动态,我怎么能知道哪些用户动态才是所有用户中最新的呢?除非我首先得到了所有的用户动态并对其进行排序。 这实际上是一个糟糕的解决方案,不能很好地应对规模化。
用户动态的合并和排序操作是必须的,但是在应用层面执行会导致效率十分低下, 而这种工作是关系数据库擅长的。所以我真正想要提供的方案是,定义我想要得到的信息来执行一个数据库查询,然后让数据库找出如何以最有效的方式来提取这些信息。
我们先在用户数据库模型中添加一个新的查询方法:
# app\models.py
class User(db.Model, UserMixin):
# ....
def followed_posts(self):
return Post.query \
.join(followers, (followers.c.followed_id == Post.user_id)) \
.filter(followers.c.follower_id == self.id) \
.order_by(Post.timestamp.desc())
这是迄今为止我在这个应用中使用的最复杂的查询。我将尝试一步一步地解读这个查询。 如果你看一下这个查询的结构,你会注意到有三个主要部分,分别是 join()
、filter()
和 order_by()
,他们都是 SQLAlchemy 查询对象的方法。
联合查询
要理解 join
操作的功能,我们来看一个例子。 假设我有一个包含以下内容的 User
表:
id | username |
---|---|
1 | diego |
2 | tom |
3 | james |
4 | luis |
为了简单起见,我只会保留用户模型的 id
和 username
字段以便进行查询,其他的都略去。
假设 followers
关系表中数据如下:
follower_id | followed_id |
---|---|
1 | 2 |
1 | 3 |
2 | 1 |
3 | 2 |
可以理解用户 diego (id=1
)关注了 用户 tom(id=2
) 和 james(id=3
),用户 tom(id=2
) 关注了 diego(id=1
),用户 James(id=3
)关注了 tom(id=2
),用户 luis(id=4
)没有出现在该表,因为它没任何关注/被关注的关系。
最后,用户动态表数据如下,四位用户各种发表了一条动态(注:这张表也省略了部分字段方便讲解):
id | body | user_id |
---|---|---|
1 | first Post by user 1 | 1 |
2 | first Post by user 2 | 2 |
3 | first Post by user 3 | 3 |
4 | first Post by user 4 | 4 |
这是该查询的 join()
联合查询:
Post.query.join(followers, (followers.c.followed_id == Post.user_id))
如果我们暂时没弄懂上面的 ORM 查询语句,不妨用 str()
方法把它转换为原生 SQL 语句看看:
>>> str(Post.query.join(followers, (followers.c.followed_id == Post.user_id)))
SELECT
post.id,
post.body,
post.user_id,
followers.follower_id,
followers.followed_id
FROM
post
JOIN
followers
ON followers.followed_id = post.user_id
(注:这里为了方便演示我加上了 followers.followed_id
和 followers.follower_id
语句,并修改删除了 post 部分字段)
执行后得到一张临时表:
id | body | user_id | follower_id | followed_id |
---|---|---|---|---|
1 | first Post by user 1 | 1 | 2 | 1 |
2 | first Post by user 2 | 2 | 1 | 2 |
2 | first Post by user 2 | 2 | 3 | 2 |
3 | first Post by user 3 | 3 | 1 | 3 |
注意 user_id
和 followed_id
列在所有数据行中都是相等的,因为这是 join
条件。用户 luis 的动态不会出现在临时表中,因为它和其他用户没有关注关系,不满足 join
条件;用户 tom 的动态会出现两次,因为它有 2 个粉丝。
创建了这个 join
操作是这个查询最关键的一步,下面我们继续完成这个查询。
过滤
join
操作得到的一张临时表远超出我想要的那部分数据。 我们只需要这个列表的一个子集,即:某个用户关注的用户们的动态,所以我需要用 filter()
来剔除所有我不需要的数据。
假设用户 diego(id=1
)要查询自己所关注的用户的动态,就需要在临时表筛选出 follower_id
等于 1 的项目。
我们这样写过滤部分的查询语句:
filter(followers.c.follower_id == self.id)
用原生 SQL 语句来表达就相当于上一节的查询再添加一个 WHERE
子句增加过滤条件:
SELECT
post.id,
post.body,
post.user_id,
followers.follower_id,
followers.followed_id
FROM
post
JOIN
followers
ON followers.followed_id = post.user_id
WHERE
followers.follower_id = 1
过滤后得到这张临时表:
id | body | user_id | follower_id | followed_id |
---|---|---|---|---|
2 | first Post by user 2 | 2 | 1 | 2 |
3 | first Post by user 3 | 3 | 1 | 3 |
它就是用户 diego(id=1
)所关注的用户的全部动态。
排序
查询流程的最后一步是对结果进行排序。这部分的查询语句如下:
order_by(Post.timestamp.desc())
在这里,把用户动态按产生的时间戳按降序排列。排序之后,第一个结果将是最新的用户动态。
组合自身动态和关注的用户动态
现在显示已关注用户的动态功能已经实现,回忆我们使用 Twitter 等社交平台的经历,在主页的时间线中除了看到我们所关注的用户动态外,还会看到我们自己的用户动态。现在我要把这两者再组合起来。
这里我使用第二个查询返回用户自己的动态,然后使用 union
操作将已关注用户动态和用户本身的动态两个查询合并。
# app\models.py
class User(db.Model, UserMixin):
# ...
def followed_posts(self):
own = Post.query.filter_by(user_id=self.id)
followed = Post.query \
.join(followers, (followers.c.followed_id == Post.user_id)) \
.filter(followers.c.follower_id == self.id)
return followed.union(own).order_by(Post.timestamp.desc())
注意,followed
和 own
查询结果集是在排序之前进行的合并。
对用户模型执行单元测试
虽然我不担心这个用户模型的粉丝机制的运行是否无误。 但当我编写更多的代码时,会担心的是在应用的不同部分修改了代码之后,如何确保本处代码将来会继续工作。确保已经编写的代码在将来继续有效的最佳方法是创建一套自动化测试,你可以在每次更新代码后执行测试。
Python
包含一个非常有用的 unittest
包,可以轻松编写和执行单元测试。现在为 User
类中的现有方法编写一些单元测试并存储到 tests.py
模块:
# tests.py
from datetime import datetime, timedelta
import unittest
from app import db, create_app
from app.models import User, Post
from config import TestConfig
class UserModelCase(unittest.TestCase):
def setUp(self):
from app.models import User, Post
self.app = create_app(TestConfig)
self.app_context = self.app.app_context()
self.app_context.push()
# from app.models import User, Post
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
def test_password_hashing(self):
u = User(username='susan')
u.set_password('cat')
self.assertFalse(u.check_password('dog'))
self.assertTrue(u.check_password('cat'))
def test_avatar(self):
u = User(username='john', email='john@example.com')
self.assertEqual(u.avatar(128), ('https://www.gravatar.com/avatar/ \
d4c74594d841139328695756648b6bd6?s=128&d=robohash'))
def test_follow(self):
u1 = User(username='john', email='john@example.com')
u2 = User(username='susan', email='susan@example.com')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
self.assertEqual(u1.followed.all(), [])
self.assertEqual(u1.followers.all(), [])
u1.follow(u2)
db.session.commit()
self.assertTrue(u1.is_following(u2))
self.assertEqual(u1.followed.count(), 1)
self.assertEqual(u1.followed.first().username, 'susan')
self.assertEqual(u2.followers.count(), 1)
self.assertEqual(u2.followers.first().username, 'john')
u1.unfollow(u2)
db.session.commit()
self.assertFalse(u1.is_following(u2))
self.assertEqual(u1.followed.count(), 0)
self.assertEqual(u2.followers.count(), 0)
def test_follow_posts(self):
# create four users
u1 = User(username='john', email='john@example.com')
u2 = User(username='susan', email='susan@example.com')
u3 = User(username='mary', email='mary@example.com')
u4 = User(username='david', email='david@example.com')
db.session.add_all([u1, u2, u3, u4])
# create four posts
now = datetime.utcnow()
p1 = Post(body="post from john", author=u1,
timestamp=now + timedelta(seconds=1))
p2 = Post(body="post from susan", author=u2,
timestamp=now + timedelta(seconds=4))
p3 = Post(body="post from mary", author=u3,
timestamp=now + timedelta(seconds=3))
p4 = Post(body="post from david", author=u4,
timestamp=now + timedelta(seconds=2))
db.session.add_all([p1, p2, p3, p4])
db.session.commit()
# setup the followers
u1.follow(u2) # john follows susan
u1.follow(u4) # john follows david
u2.follow(u3) # susan follows mary
u3.follow(u4) # mary follows david
db.session.commit()
# check the followed posts of each user
f1 = u1.followed_posts().all()
f2 = u2.followed_posts().all()
f3 = u3.followed_posts().all()
f4 = u4.followed_posts().all()
self.assertEqual(f1, [p2, p4, p1])
self.assertEqual(f2, [p2, p3])
self.assertEqual(f3, [p3, p4])
self.assertEqual(f4, [p4])
if __name__ == '__main__':
unittest.main(verbosity=2)
我添加了四个用户模型的测试,包含密码哈希、用户头像和粉丝功能。
setUp()
和 tearDown()
方法是单元测试框架分别在每个测试之前和之后执行的特殊方法。我在 setUp()
中实现了一些小技巧,让进行单元测试时不要使用我用于开发的常规数据库,就是将应用配置更改为 sqlite://
,我在测试过程中通过 SQLAlchemy
来使用 SQLite
内存数据库。db.create_all()
创建所有的数据库表。这是从头开始创建数据库的快速方法,在测试中相当好用。
在单元测试和开发环境中,我们可能需要使用不同的配置,现在我们修改 config.py
文件,添加一套用于测试的配置:
# config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
class TestConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///'
TestConfig
继承了常规配置 Config
,拥有 Config
原有的配置项,唯一不同的是修改了数据库路径 SQLALCHEMY_DATABASE_URI = 'sqlite:///'
。
接下来我们再修改工厂函数,让其能接受一个配置类作为参数从而创建 app
:
# app\__init__.py
def create_app(config):
app = Flask(__name__)
app.config.from_object(config)
# ...
自然地,microblog.py
也要修改:
# microblog.py
from config import Config
app = create_app(Config)
修改完成后我们运行我们的单元测试:
(venv) $ python tests.py
test_avatar (__main__.UserModelCase) ... [2021-07-26 11:00:46,156] INFO in __init__: Microblog startup!
ok
test_follow (__main__.UserModelCase) ... [2021-07-26 11:00:46,260] INFO in __init__: Microblog startup!
ok
test_follow_posts (__main__.UserModelCase) ... [2021-07-26 11:00:46,291] INFO in __init__: Microblog startup!
ok
test_password_hashing (__main__.UserModelCase) ... [2021-07-26 11:00:46,317] INFO in __init__: Microblog startup!
ok
----------------------------------------------------------------------
Ran 4 tests in 0.640s
OK
从现在起,每次对应用进行更改时,都可以重新运行测试,以确保应用原有功能正常。 另外,每次将另一个功能添加到应用时,都应该为其编写一个单元测试。
在应用中集成粉丝机制
数据库模型中粉丝机制的实现现在已经完成,现在将它集成到应用中,这些都基于前面几章的概念。
先添加两个新的路由和视图函数,它们提供了用户关注和取消关注的 URL 和逻辑实现:
# app\auth\routes.py
@auth_routes.route('/follow/<username>')
@login_required
def follow(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('User {} not found.'.format(username))
return redirect(url_for('main.index'))
if user == current_user:
flash('You cannot follow yourself!')
return redirect(url_for('auth.user', username=username))
current_user.follow(user)
db.session.commit()
flash('You are following {}!'.format(username))
return redirect(url_for('auth.user', username=username))
@auth_routes.route('/unfollow/<username>')
@login_required
def unfollow(username):
user = User.query.filter_by(username=username).first()
if user is None:
flash('User {} not found.'.format(username))
return redirect(url_for('main.index'))
if user == current_user:
flash('You cannot unfollow yourself!')
return redirect(url_for('auth.user', username=username))
current_user.unfollow(user)
db.session.commit()
flash('You are not following {}.'.format(username))
return redirect(url_for('auth.user', username=username))
视图函数的逻辑不言而喻,但要注意所有的错误检查,以防止出现意外的问题,并在出现问题时向用户提供有用的信息。
现在把这两个视图函数的路由添加到用户的个人主页中,以便其他用户执行关注和取消关注的操作:
# app\templates\auth\user.html
<p>
{{ user.followers.count() }} followers, {{ user.followed.count() }} following.
</p>
{% if user == current_user %}
<p>
<a href="{{ url_for('auth.edit_profile') }}">Edit your profile</a>
</p>
{% elif not current_user.is_following(user) %}
<p>
<a href="{{ url_for('auth.follow', username=user.username) }}">Follow</a>
</p>
{% else %}
<p>
<a href="{{ url_for('auth.unfollow', username=user.username) }}">Unfollow</a>
</p>
{% endif %}
首先我们添加了一个显示某用户有多少已关注和多少粉丝的显示。然后我们用一组 if
语句来处理登录用户和当前用户页面的关系:
- 如果用户查看自己的个人主页,仍然是
Edit
链接不变。 - 如果用户查看其他未关注的用户的个人主页,显示
Follow
链接。 - 如果用户查看其他已经关注的用户的个人主页,显示
Unfollow
链接。
本文源码:https://github.com/SingleDiego/Flask-Tutorial-Source-Code/tree/SingleDiego-patch-08
网友评论