简介
本文将使用PyTorch框架来实现神经协同过滤(NCF)算法的三个模型,分别是GMF、MLP、NeuMF。关于NCF相关内容可以参考神经协同过滤NCF(一)之模型介绍篇。
数据预处理
这里采用的是MovieLens中的ml-1m数据集,并且只使用了其中的ratings.dat文件,即用户对电影的评分数据。关于这个数据集可以参考推荐系统数据集之MovieLens。
数据预处理主要包含以下几项功能:
- 对数据进行二值化(评分数据变为隐反馈数据)
- 将数据划分为训练集和测试集
- 生成小批量采样数据
完整代码如下:
import random
import pandas as pd
import numpy as np
from copy import deepcopy
random.seed(0)
class DataProcess(object):
def __init__(self, filename):
self._filename = filename
self._loadData()
self._preProcess()
self._binarize(self._originalRatings)
# 对'userId'这一列的数据,先去重,然后构成一个用户列表
self._userPool = set(self._originalRatings['userId'].unique())
self._itemPool = set(self._originalRatings['itemId'].unique())
print("user_pool size: ", len(self._userPool))
print("item_pool size: ", len(self._itemPool))
self._select_Negatives(self._originalRatings)
self._split_pool(self._preprocessRatings)
def _loadData(self):
self._originalRatings = pd.read_csv(self._filename, sep='::', header=None, names=['uid', 'mid', 'rating', 'timestamp'],
engine='python')
return self._originalRatings
def _preProcess(self):
"""
对user和item都重新编号,这里这么做的原因是因为,模型的输入是one-hot向量,需要把user和item都限制在Embedding的长度之内,
模型的两个输入的长度分别是user和item的数量,所以要重新从0编号。
"""
# 1. 新建名为"userId"的列,这列对用户从0开始编号
user_id = self._originalRatings[['uid']].drop_duplicates().reindex()
user_id['userId'] = np.arange(len(user_id)) #根据user的长度创建一个数组
# 将原先的DataFrame与user_id按照"uid"这一列进行合并
self._originalRatings = pd.merge(self._originalRatings, user_id, on=['uid'], how='left')
# 2. 对物品进行重新排列
item_id = self._originalRatings[['mid']].drop_duplicates()
item_id['itemId'] = np.arange(len(item_id))
self._originalRatings = pd.merge(self._originalRatings, item_id, on=['mid'], how='left')
# 按照['userId', 'itemId', 'rating', 'timestamp']的顺序重新排列
self._originalRatings = self._originalRatings[['userId', 'itemId', 'rating', 'timestamp']]
print(self._originalRatings)
print('Range of userId is [{}, {}]'.format(self._originalRatings.userId.min(), self._originalRatings.userId.max()))
print('Range of itemId is [{}, {}]'.format(self._originalRatings.itemId.min(), self._originalRatings.itemId.max()))
def _binarize(self, ratings):
"""
binarize data into 0 or 1 for implicit feedback
"""
ratings = deepcopy(ratings)
ratings['rating'][ratings['rating'] > 0] = 1.0
self._preprocessRatings = ratings
# print("binary: \n", self._preprocessRatings)
def _select_Negatives(self, ratings):
"""
Select al;l negative samples and 100 sampled negative items for each user.
"""
# 构造user-item表
interact_status = ratings.groupby('userId')['itemId'].apply(set).reset_index().rename(
columns={'itemId': 'interacted_items'})
print("interact_status: \n", interact_status)
# 把与用户没有产生过交互的样本都当做是负样本
interact_status['negative_items'] = interact_status['interacted_items'].apply(lambda x: self._itemPool - x)
# 从上面的全部负样本中随机选99个出来
interact_status['negative_samples'] = interact_status['negative_items'].apply(lambda x: random.sample(x, 99))
print("after sampling interact_status: \n", interact_status)
print("select and rearrange columns")
self._negatives = interact_status[['userId', 'negative_items', 'negative_samples']]
def _split_pool(self, ratings):
"""leave one out train/test split """
print("sort by timestamp descend")
# 先按照'userID'进行分组,然后根据时间戳降序排列
ratings['rank_latest'] = ratings.groupby(['userId'])['timestamp'].rank(method='first', ascending=False)
print(ratings)
# 选取排名第一的数据作为测试集,也就是最新的那个数据
test = ratings[ratings['rank_latest'] == 1]
# 选取所有排名靠后的,也就是历史数据当做训练集
train = ratings[ratings['rank_latest'] > 1]
# print("test: \n", test)
# print("train: \n", train)
print("size of test {0}, size of train {1}".format(len(test), len(train)))
# 确保训练集和测试集的userId是一样的
assert train['userId'].nunique() == test['userId'].nunique()
self.train_ratings = train[['userId', 'itemId', 'rating']]
self.test_ratings = test[['userId', 'itemId', 'rating']]
def sample_generator(self, num_negatives):
# 合并之后的train_ratings的列包括['userId','itemId','rating','negative_items']
train_ratings = pd.merge(self.train_ratings, self._negatives[['userId', 'negative_items']], on='userId')
# 从用户的全部负样本集合中随机选择num_negatives个样本当做负样本,并产生一个新的名为"negatives"的列
train_ratings['negatives'] = train_ratings['negative_items'].apply(lambda x: random.sample(x, num_negatives))
print(train_ratings)
# 构造模型所需要的数据,分别是输入user、items以及目标分值ratings。
users, items, ratings = [], [], []
for row in train_ratings.itertuples():
# 构造正样本,分别是userId, itemId以及目标分值1
users.append(int(row.userId))
items.append(int(row.itemId))
ratings.append(float(row.rating))
# 为每个用户构造num_negatives个负样本,分别是userId, itemId以及目标分值0
for i in range(num_negatives):
users.append(int(row.userId))
items.append(int(row.negatives[i]))
ratings.append(float(0)) # 负样本的ratings为0,直接强行设置为0
return users, items, ratings
NCF框架
NCF框架是本文要实现的3个模型的主体结构,因此在代码设计的时候,可以将3个模型公共的部分放到这里,因此我将它定义为一个基类。它主要定义了Embedding层,全连接层,激活函数等。由于每一个模型的线性模型的输入是不一致的,故将其下放到子类中去定义。 NCF框架代码如下:
class NCF(object):
def __init__(self, config, latent_dim_gmf=8, latent_dim_mlp=8):
self._config = config
self._num_users = config['num_users']
self._num_items = config['num_items']
self._latent_dim_gmf = latent_dim_gmf
self._latent_dim_mlp = latent_dim_mlp
# 建立MLP模型的user Embedding层和item Embedding层,输入的向量长度分别为用户的数量,item的数量,输出都是隐式空间的维度latent dim
self._embedding_user_mlp = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_mlp)
self._embedding_item_mlp = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_mlp)
# 建立GMP模型的user Embedding层和item Embedding层,输入的向量长度分别为用户的数量,item的数量,输出都是隐式空间的维度latent dim
self._embedding_user_gmf = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_gmf)
self._embedding_item_gmf = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_gmf)
# 全连接层
self._fc_layers = torch.nn.ModuleList()
for idx, (in_size, out_size) in enumerate(zip(config['layers'][:-1], config['layers'][1:])):
self._fc_layers.append(torch.nn.Linear(in_size, out_size))
# 激活函数
self._logistic = nn.Sigmoid()
@property
def fc_layers(self):
return self._fc_layers
@property
def embedding_user_gmf(self):
return self._embedding_user_gmf
@property
def embedding_item_gmf(self):
return self._embedding_item_gmf
@property
def embedding_user_mlp(self):
return self._embedding_user_mlp
@property
def embedding_item_mlp(self):
return self._embedding_item_mlp
def saveModel(self):
torch.save(self.state_dict(), self._config['model_name'])
@abstractmethod
def load_preTrained_weights(self):
pass
GMF模型
GMF模型的结构如下图所示:
GMF模型可以看到它主要包含user和item两个embedding层,一个线性模型外加一个Sigmoid激活函数,PyTorch中提供了现成的Embedding模块,线性模块,以及Sigmoid模块。可以直接使用。它分别继承自NCF基类和torch.nn.Module。
代码如下:
class GMF(NCF, nn.Module):
def __init__(self, config, latent_dim_gmf):
nn.Module.__init__(self)
NCF.__init__(self, config=config, latent_dim_gmf=latent_dim_gmf)
# 创建一个线性模型,输入为潜在特征向量,输出向量长度为1
self._affine_output = nn.Linear(in_features=self._latent_dim_gmf, out_features=1)
@property
def affine_output(self):
return self._affine_output
def forward(self, user_indices, item_indices):
"""
前向传播
:param user_indices: user Tensor
:param item_indices: item Tensor
:return: predicted rating
"""
# 先将user和item转换为对应的Embedding表示,注意这个支持Tensor操作,即传入的是一个user列表,对其中每一个user都会执行Embedding操作,即都会使用Embedding表示
user_embedding = self._embedding_user_gmf(user_indices)
item_embedding = self._embedding_item_gmf(item_indices)
# 对user_embedding和user_embedding进行逐元素相乘, 这一步其实就是MF算法的实现
element_product = torch.mul(user_embedding, item_embedding)
# 将逐元素的乘积的结果通过一个S型神经元
logits = self._affine_output(element_product)
rating = self._logistic(logits)
return rating
def load_preTrained_weights(self):
pass
MLP模型
MLP与GMP一样,只需要定义自己的线性模型即可。 MLP模型代码如下:
class MLP(NCF, nn.Module):
def __init__(self, config, latent_dim_mlp):
nn.Module.__init__(self)
NCF.__init__(self, config=config, latent_dim_mlp=latent_dim_mlp)
# 创建一个线性模型,输入为潜在特征向量,输出向量长度为1
self._affine_output = torch.nn.Linear(in_features=config['layers'][-1], out_features=1)
@property
def affine_output(self):
return self._affine_output
def forward(self, user_indices, item_indices):
"""
:param user_indices: user Tensor
:param item_indices: item Tensor
"""
# 先将user和item转换为对应的Embedding表示,注意这个支持Tensor操作,即传入的是一个user列表,
# 对其中每一个user都会执行Embedding操作,即都会使用Embedding表示
user_embedding = self._embedding_user_mlp(user_indices)
item_embedding = self._embedding_item_mlp(item_indices)
vector = torch.cat([user_embedding, item_embedding], dim=-1) # concat latent vector
for idx, _ in enumerate(range(len(self._fc_layers))):
vector = self._fc_layers[idx](vector)
vector = torch.nn.ReLU()(vector)
## Batch normalization
# vector = torch.nn.BatchNorm1d()(vector)
## DroupOut layer
# vector = torch.nn.Dropout(p=0.5)(vector)
logits = self._affine_output(vector)
rating = self._logistic(logits)
return rating
def load_preTrained_weights(self):
config = self._config
gmf_model = GMF(config, config['latent_dim_gmf'])
if config['use_cuda'] is True:
gmf_model.cuda()
# 加载GMF模型参数到指定的GPU上
state_dict = torch.load(self._config['pretrain_gmf'])
#map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
#map_location = {'cuda:0': 'cpu'})
gmf_model.load_state_dict(state_dict, strict=False)
self._embedding_item_mlp.weight.data = gmf_model.embedding_item_gmf.weight.data
self._embedding_user_mlp.weight.data = gmf_model.embedding_user_gmf.weight.data
NeuMF模型
NeuMF模型是集成了前两个模型,因此前向传播部分会稍微复杂点,需要先将GMF模型和MLP模型的输出连接起来之后再输入到线性模型中去。 NeuMF模型代码如下:
class NeuMF(NCF, nn.Module):
def __init__(self, config, latent_dim_gmf, latent_dim_mlp):
nn.Module.__init__(self)
NCF.__init__(self, config, latent_dim_gmf, latent_dim_mlp)
# 创建一个线性模型,输入为GMF模型和MLP模型的潜在特征向量长度之和,输出向量长度为1
self._affine_output = torch.nn.Linear(in_features=config['layers'][-1] + config['latent_dim_gmf'], out_features=1)
@property
def affine_output(self):
return self._affine_output
def forward(self, user_indices, item_indices):
user_embedding_mlp = self._embedding_user_mlp(user_indices)
item_embedding_mlp = self._embedding_item_mlp(item_indices)
user_embedding_gmf = self._embedding_user_gmf(user_indices)
item_embedding_gmf = self._embedding_item_gmf(item_indices)
# concat the two latent vector
mlp_vector = torch.cat([user_embedding_mlp, item_embedding_mlp], dim=-1)
# multiply the two latent vector
gmf_vector = torch.mul(user_embedding_gmf, item_embedding_gmf)
for idx, _ in enumerate(range(len(self._fc_layers))):
mlp_vector = self._fc_layers[idx](mlp_vector)
mlp_vector = torch.nn.ReLU()(mlp_vector)
vector = torch.cat([mlp_vector, gmf_vector], dim=-1)
logits = self._affine_output(vector)
rating = self._logistic(logits)
return rating
def load_preTrained_weights(self):
# 加载MLP模型参数
mlp_model = MLP(self._config['mlp_config'], self._config['mlp_config']['latent_dim_mlp'])
if self._config['use_cuda'] is True:
mlp_model.cuda()
state_dict = torch.load(self._config['pretrain_mlp'])
# map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
# map_location = {'cuda:0': 'cpu'})
mlp_model.load_state_dict(state_dict, strict=False)
self._embedding_item_mlp.weight.data = mlp_model.embedding_item_mlp.weight.data
self._embedding_user_mlp.weight.data = mlp_model.embedding_user_mlp.weight.data
for idx in range(len(self._fc_layers)):
self._fc_layers[idx].weight.data = mlp_model.fc_layers[idx].weight.data
# 加载GMF模型参数
gmf_model = GMF(self._config['gmf_config'], self._config['gmf_config']['latent_dim_gmf'])
if self._config['use_cuda'] is True:
gmf_model.cuda()
state_dict = torch.load(self._config['pretrain_gmf'])
# map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
# map_location = {'cuda:0': 'cpu'})
mlp_model.load_state_dict(state_dict, strict=False)
self._embedding_item_gmf.weight.data = gmf_model.embedding_item_gmf.weight.data
self._embedding_user_gmf.weight.data = gmf_model.embedding_user_gmf.weight.data
self._affine_output.weight.data = self._config['alpha'] * torch.cat([mlp_model.affine_output.weight.data, gmf_model.affine_output.weight.data], dim=-1)
self._affine_output.bias.data = self._config['alpha'] * (mlp_model.affine_output.bias.data + gmf_model.affine_output.bias.data)
训练器
单独定义了一个Trainer的类,用来训练不同的模型,这个类里面包含了优化器,损失函数的定义,以及前向传播、反向传播、参数更新、模型保存等操作。
代码如下:
class Trainer(object):
def __init__(self, model, config):
self._config = config
self._model = model
# 选择优化器
self._optimizer = pick_optimizer(self._model, self._config)
# 定义损失函数,对于隐反馈数据,这里使用交叉熵损失函数
self._crit = torch.nn.BCELoss()
def _train_single_batch(self, users, items, ratings):
"""
对单个小批量数据进行训练
:param users: user Tensor
:param items: item Tensor
:param ratings: rating Tensor
:return:
"""
if self._config['use_cuda'] is True:
# 将这些数据由CPU迁移到GPU
users, items, ratings = users.cuda(), items.cuda(), ratings.cuda()
# 先将梯度清零,如果不清零,那么这个梯度就和上一个mini-batch有关
self._optimizer.zero_grad()
# 模型的输入users, items,调用forward进行前向传播
ratings_pred = self._model(users, items)
# 通过交叉熵损失函数来计算损失, ratings_pred.view(-1)代表将预测结果摊平,变成一维的结构。
loss = self._crit(ratings_pred.view(-1), ratings)
# 反向传播计算梯度
loss.backward()
# 梯度下降等优化器 更新参数
self._optimizer.step()
# 将loss的值提取成python的float类型
loss = loss.item()
return loss
def _train_an_epoch(self, train_loader, epoch_id):
"""
训练一个Epoch,即将训练集中的所有样本全部都过一遍
:param train_loader: Torch的DataLoader
:param epoch_id: 训练轮次Id
:return:
"""
# 告诉模型目前处于训练模式,启用dropout以及batch normalization
self._model.train()
total_loss = 0
# 从DataLoader中获取小批量的id以及数据
for batch_id, batch in enumerate(train_loader):
assert isinstance(batch[0], torch.LongTensor)
# 这里的user, item, rating大小变成了1024维了,因为batch_size是1024,即每次选取1024个样本数据进行训练
user, item, rating = batch[0], batch[1], batch[2]
rating = rating.float()
loss = self._train_single_batch(user, item, rating)
print('[Training Epoch {}] Batch {}, Loss {}'.format(epoch_id, batch_id, loss))
total_loss += loss
print('Training Epoch: {}, TotalLoss: {}'.format(epoch_id, total_loss))
def train(self, sampleGenerator):
# 是否使用GPU加速
self.use_cuda()
# 是否使用预先训练好的参数
self.load_preTrained_weights()
for epoch in range(self._config['num_epoch']):
print('-' * 20 + ' Epoch {} starts '.format(epoch) + '-' * 20)
# 每个轮次都重新随机产生样本数据集
users, items, ratings = sampleGenerator(num_negatives=self._config['num_negative'])
# 构造一个DataLoader
data_loader = Construct_DataLoader(users=users, items=items, ratings=ratings,
batchsize=self._config['batch_size'])
# 训练一个轮次
self._train_an_epoch(data_loader, epoch_id=epoch)
def use_cuda(self):
if self._config['use_cuda'] is True:
assert torch.cuda.is_available(), 'CUDA is not available'
torch.cuda.set_device(self._config['device_id'])
self._model.cuda()
def load_preTrained_weights(self):
if self._config['pretrain'] is True:
self._model.load_preTrained_weights()
def save(self):
self._model.saveModel()
测试代码
主要是分为了训练和测试两个阶段,先训练模型,然后进行一些简单的测试。
代码如下:
if __name__ == "__main__":
####################################################################################
# NCF 神经协同过滤算法
####################################################################################
# 加载和预处理数据
dp = DataProcess("../Data/ml-1m/ratings.dat")
# 初始化GMP模型
# config = gmf_config
# model = GMF(config, config['latent_dim_gmf'])
# # 初始化MLP模型
# config = mlp_config
# model = MLP(config, config['latent_dim_mlp'])
# 初始化NeuMF模型
config = neumf_config
model = NeuMF(config, config['latent_dim_gmf'], config['latent_dim_mlp'])
# ###############################################################
# 模型训练阶段
# ###############################################################
trainer = Trainer(model=model, config=config)
trainer.train(dp.sample_generator)
trainer.save()
# ###############################################################
# 模型测试阶段
# ###############################################################
# 加载数据集
dp = DataProcess("../Data/ml-1m/ratings.dat")
config = neumf_config
neumf = NeuMF(config, config['latent_dim_gmf'], config['latent_dim_mlp'])
state_dict = torch.load("../Models/NCF_NeuMF.model", map_location=torch.device('cpu'))
neumf.load_state_dict(state_dict, strict=False)
print(neumf.forward(torch.LongTensor([1]), torch.LongTensor([1193])))
print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([661])))
print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([914])))
print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([3408])))
print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([1245])))
print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([32])))
print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([4])))
print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([62])))
完整代码见https://github.com/HeartbreakSurvivor/RsAlgorithms。
网友评论