推荐阅读:
文章推荐系统 | 一、推荐流程设计
文章推荐系统 | 二、同步业务数据
文章推荐系统 | 三、收集用户行为数据
文章推荐系统 | 四、构建离线文章画像
在上篇文章中,我们已经完成了离线文章画像的构建,接下来,我们要为相似文章推荐做准备,那就是计算文章之间的相似度。首先,我们要计算出文章的词向量,然后利用文章的词向量来计算文章的相似度。
计算文章词向量
我们可以通过大量的历史文章数据,训练文章中每个词的词向量,由于文章数据过多,通常是分频道进行词向量训练,即每个频道训练一个词向量模型,我们包括的频道如下所示
channel_info = {
1: "html",
2: "开发者资讯",
3: "ios",
4: "c++",
5: "android",
6: "css",
7: "数据库",
8: "区块链",
9: "go",
10: "产品",
11: "后端",
12: "linux",
13: "人工智能",
14: "php",
15: "javascript",
16: "架构",
17: "前端",
18: "python",
19: "java",
20: "算法",
21: "面试",
22: "科技动态",
23: "js",
24: "设计",
25: "数码产品",
}
接下来,分别对各自频道内的文章进行分词处理,这里先选取 18 号频道内的所有文章,进行分词处理
spark.sql("use article")
article_data = spark.sql("select * from article_data where channel_id=18")
words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])
def segmentation(partition):
import os
import re
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs
abspath = "/root/words"
# 结巴加载用户词典
userDict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userDict_path)
# 停用词文本
stopwords_path = os.path.join(abspath, "stopwords.txt")
def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
return stopwords_list
# 所有的停用词列表
stopwords_list = get_stopwords_list()
# 分词
def cut_sentence(sentence):
"""对切割之后的词语进行过滤,去除停用词,保留名词,英文和自定义词库中的词,长度大于2的词"""
# eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
seg_list = pseg.lcut(sentence)
seg_list = [i for i in seg_list if i.flag not in stopwords_list]
filtered_words_list = []
for seg in seg_list:
if len(seg.word) <= 1:
continue
elif seg.flag == "eng":
if len(seg.word) <= 2:
continue
else:
filtered_words_list.append(seg.word)
elif seg.flag.startswith("n"):
filtered_words_list.append(seg.word)
elif seg.flag in ["x", "eng"]: # 是自定一个词语或者是英文单词
filtered_words_list.append(seg.word)
return filtered_words_list
for row in partition:
sentence = re.sub("<.*?>", "", row.sentence) # 替换掉标签数据
words = cut_sentence(sentence)
yield row.article_id, row.channel_id, words
words_df
结果如下所示,words 为分词后的词语列表
接着,使用分词后的所有词语,对 Word2Vec 模型进行训练并将模型保存到 HDFS,其中 vectorSize 为词向量的长度,minCount 为词语的最小出现次数,windowSize 为训练窗口的大小,inputCol 为输入的列名,outputCol 为输出的列名
from pyspark.ml.feature import Word2Vec
w2v_model = Word2Vec(vectorSize=100, inputCol='words', outputCol='vector', minCount=3)
model = w2v_model.fit(words_df)
model.save("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")
加载训练好的 Word2Vec 模型
from pyspark.ml.feature import Word2VecModel
w2v_model = Word2VecModel.load("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")
vectors = w2v_model.getVectors()
vectors
结果如下所示,其中 vector 是训练后的每个词的 100 维词向量,是 vector 类型格式的,如 [0.2 -0.05 -0.1 ...]
这里,我们计算出了所有词语的词向量,接下来,还要得到关键词的词向量,因为我们需要通过关键词的词向量来计算文章的词向量。那么,首先通过读取频道内的文章画像来得到关键词(实际场景应该只读取新增文章画像)
article_profile = spark.sql("select * from article_profile where channel_id=18")
在文章画像表中,关键词和权重是存储在同一列的,我们可以利用 LATERAL VIEW explode()
方法,将 map 类型的 keywords 列中的关键词和权重转换成单独的两列数据
article_profile.registerTempTable('profile')
keyword_weight = spark.sql("select article_id, channel_id, keyword, weight from profile LATERAL VIEW explode(keywords) AS keyword, weight")
keyword_weight
结果如下所示,keyword 为关键词,weight 为对应的权重
这时就可以利用关键词 keyword 列,将文章关键词 keyword_weight
与词向量结果 vectors
进行内连接,从而得到每个关键词的词向量
keywords_vector = keyword_weight.join(vectors, vectors.word==keyword_weight.keyword, 'inner')
keywords_vector
结果如下所示,vector 即对应关键词的 100 维词向量
接下来,将文章每个关键词的词向量加入权重信息,这里使每个关键词的词向量 = 关键词的权重 x 关键词的词向量,即 weight_vector = weight x vector,注意这里的 vector
为 vector 类型,所以 weight x vector 是权重和向量的每个元素相乘,向量的长度保持不变
def compute_vector(row):
return row.article_id, row.channel_id, row.keyword, row.weight * row.vector
article_keyword_vectors = keywords_vector.rdd.map(compute_vector).toDF(["article_id", "channel_id", "keyword", "weightingVector"])
article_keyword_vectors
结果如下所示,weightingVector 即为加入权重信息后的关键词的词向量
再将上面的结果按照 article_id 进行分组,利用 collect_set()
方法,将一篇文章内所有关键词的词向量合并为一个列表
article_keyword_vectors.registerTempTable('temptable')
article_keyword_vectors = spark.sql("select article_id, min(channel_id) channel_id, collect_set(weightingVector) vectors from temptable group by article_id")
article_keyword_vectors
结果如下所示,vectors 即为文章内所有关键词向量的列表,如 [[0.6 0.2 ...], [0.1 -0.07 ...], ...]
接下来,利用上面得出的二维列表,计算每篇文章内所有关键词的词向量的平均值,作为文章的词向量。注意,这里的 vectors
是包含多个词向量的列表,词向量列表的平均值等于其中每个词向量的对应元素相加再除以词向量的个数
def compute_avg_vectors(row):
x = 0
for i in row.vectors:
x += i
# 求平均值
return row.article_id, row.channel_id, x / len(row.vectors)
article_vector = article_keyword_vectors.rdd.map(compute_avg_vectors).toDF(['article_id', 'channel_id', 'vector'])
article_vector
结果如下所示
此时,article_vector
中的 vector
列还是 vector 类型,而 Hive 不支持该数据类型,所以需要将 vector 类型转成 array 类型(list)
def to_list(row):
return row.article_id, row.channel_id, [float(i) for i in row.vector.toArray()]
article_vector = article_vector.rdd.map(to_list).toDF(['article_id', 'channel_id', 'vector'])
在 Hive 中创建文章词向量表 article_vector
CREATE TABLE article_vector
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
articlevector ARRAY<DOUBLE> comment "keyword"
);
最后,将 18 号频道内的所有文章的词向量存储到 Hive 的文章词向量表 article_vector 中
article_vector.write.insertInto("article_vector")
这样,我们就计算出了 18 号频道下每篇文章的词向量,在实际场景中,我们还要分别计算出其他所有频道下每篇文章的词向量。
计算文章相似度
前面我们计算出了文章的词向量,接下来就可以根据文章的词向量来计算文章的相似度了。通常我们会有几百万、几千万甚至上亿规模的文章数据,为了优化计算性能,我们可以只计算每个频道内文章之间的相似度,因为通常只有相同频道的文章关联性较高,而不同频道之间的文章通常关联性较低。在每个频道内,我们还可以用聚类或局部敏感哈希对文章进行分桶,将文章相似度的计算限制在更小的范围,只计算相同分类内或相同桶内的文章相似度。
- 聚类(Clustering),对每个频道内的文章进行聚类,可以使用 KMeans 算法,需要提前设定好类别个数 K,聚类算法的时间复杂度并不小,也可以使用一些优化的聚类算法,比如二分聚类、层次聚类等。但通常聚类算法也比较耗时,所以通常被使用更多的是局部敏感哈希。
Spark 的 BisectingKMeans 模型训练代码示例
from pyspark.ml.clustering import BisectingKMeans
bkmeans = BisectingKMeans(k=100, minDivisibleClusterSize=50, featuresCol="articlevector", predictionCol='group')
bkmeans_model = bkmeans.fit(article_vector)
bkmeans_model.save("hdfs://hadoop-master:9000/headlines/models/articleBisKmeans/channel_%d_%s.bkmeans" % (channel_id, channel))
- 局部敏感哈希 LSH(Locality Sensitive Hashing),LSH 算法是基于一个假设,如果两个文本在原有的数据空间是相似的,那么经过哈希函数转换以后,它们仍然具有很高的相似度,即越相似的文本在哈希之后,落到相同的桶内的概率就越高。所以,我们只需要将目标文章进行哈希映射并得到其桶号,然后取出该桶内的所有文章,再进行线性匹配即可查找到与目标文章相邻的文章。其实 LSH 并不能保证一定能够查找到与目标文章最相邻的文章,而是在减少需要匹配的文章个数的同时,保证查找到最近邻的文章的概率很大。
下面我们将使用 LSH 模型来计算文章相似度,首先,读取 18 号频道内所有文章的 ID 和词向量作为训练集
article_vector = spark.sql("select article_id, articlevector from article_vector where channel_id=18")
train = articlevector.select(['article_id', 'articlevector'])
文章词向量表中的词向量是被存储为 array 类型的,我们利用 Spark 的 Vectors.dense()
方法,将 array 类型(list)转为 vector 类型
from pyspark.ml.linalg import Vectors
def list_to_vector(row):
return row.article_id, Vectors.dense(row.articlevector)
train = train.rdd.map(list_to_vector).toDF(['article_id', 'articlevector'])
使用训练集 train
对 Spark 的 BucketedRandomProjectionLSH
模型进行训练,其中 inputCol 为输入特征列,outputCol 为输出特征列,numHashTables 为哈希表数量,bucketLength 为桶的数量,数量越多,相同数据进入到同一个桶的概率就越高
from pyspark.ml.feature import BucketedRandomProjectionLSH
brp = BucketedRandomProjectionLSH(inputCol='articlevector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)
训练好模型后,调用 approxSimilarityJoin()
方法即可计算数据之间的相似度,如 model.approxSimilarityJoin(df1, df2, 2.0, distCol='EuclideanDistance')
就是利用欧几里得距离作为相似度,计算在 df1 与 df2 每条数据的相似度,这里我们计算训练集中所有文章之间的相似度
similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')
similar
结果如下所示,EuclideanDistance 就是两篇文章的欧几里得距离,即相似度
在后面的推荐流程中,会经常查询文章相似度,所以出于性能考虑,我们选择将文章相似度结果存储到 Hbase 中。首先创建文章相似度表
create 'article_similar', 'similar'
然后存储文章相似度结果
def save_hbase(partition):
import happybase
pool = happybase.ConnectionPool(size=3, host='hadoop-master')
with pool.connection() as conn:
# 建立表连接
table = conn.table('article_similar')
for row in partition:
if row.datasetA.article_id != row.datasetB.article_id:
table.put(str(row.datasetA.article_id).encode(), {"similar:{}".format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})
# 手动关闭所有的连接
conn.close()
similar.foreachPartition(save_hbase)
Apscheduler 定时更新
将文章相似度计算加入到文章画像更新方法中,首先合并最近一个小时的文章完整信息,接着计算 TF-IDF 和 TextRank 权重,并根据 TF-IDF 和 TextRank 权重计算得出关键词和主题词,最后计算文章的词向量及文章的相似度
def update_article_profile():
"""
定时更新文章画像及文章相似度
:return:
"""
ua = UpdateArticle()
sentence_df = ua.merge_article_data()
if sentence_df.rdd.collect():
textrank_keywords_df, keywordsIndex = ua.generate_article_label()
article_profile = ua.get_article_profile(textrank_keywords_df, keywordsIndex)
ua.compute_article_similar(article_profile)
参考
https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)
网友评论