美文网首页
神经协同过滤NCF(二)之代码实战篇

神经协同过滤NCF(二)之代码实战篇

作者: HaloZhang | 来源:发表于2020-12-02 21:40 被阅读0次

    简介

    本文将使用PyTorch框架来实现神经协同过滤(NCF)算法的三个模型,分别是GMF、MLP、NeuMF。关于NCF相关内容可以参考神经协同过滤NCF(一)之模型介绍篇

    数据预处理

    这里采用的是MovieLens中的ml-1m数据集,并且只使用了其中的ratings.dat文件,即用户对电影的评分数据。关于这个数据集可以参考推荐系统数据集之MovieLens
    数据预处理主要包含以下几项功能:

    • 对数据进行二值化(评分数据变为隐反馈数据)
    • 将数据划分为训练集和测试集
    • 生成小批量采样数据

    完整代码如下:

    import random
    import pandas as pd
    import numpy as np
    from copy import deepcopy
    
    random.seed(0)
    
    class DataProcess(object):
        def __init__(self, filename):
            self._filename = filename
            self._loadData()
            self._preProcess()
            self._binarize(self._originalRatings)
            # 对'userId'这一列的数据,先去重,然后构成一个用户列表
            self._userPool = set(self._originalRatings['userId'].unique())
            self._itemPool = set(self._originalRatings['itemId'].unique())
            print("user_pool size: ", len(self._userPool))
            print("item_pool size: ", len(self._itemPool))
    
            self._select_Negatives(self._originalRatings)
            self._split_pool(self._preprocessRatings)
    
        def _loadData(self):
            self._originalRatings = pd.read_csv(self._filename, sep='::', header=None, names=['uid', 'mid', 'rating', 'timestamp'],
                                                engine='python')
            return self._originalRatings
    
        def _preProcess(self):
            """
            对user和item都重新编号,这里这么做的原因是因为,模型的输入是one-hot向量,需要把user和item都限制在Embedding的长度之内,
            模型的两个输入的长度分别是user和item的数量,所以要重新从0编号。
            """
            # 1. 新建名为"userId"的列,这列对用户从0开始编号
            user_id = self._originalRatings[['uid']].drop_duplicates().reindex()
            user_id['userId'] = np.arange(len(user_id)) #根据user的长度创建一个数组
            # 将原先的DataFrame与user_id按照"uid"这一列进行合并
            self._originalRatings = pd.merge(self._originalRatings, user_id, on=['uid'], how='left')
    
            # 2. 对物品进行重新排列
            item_id = self._originalRatings[['mid']].drop_duplicates()
            item_id['itemId'] = np.arange(len(item_id))
            self._originalRatings = pd.merge(self._originalRatings, item_id, on=['mid'], how='left')
    
            # 按照['userId', 'itemId', 'rating', 'timestamp']的顺序重新排列
            self._originalRatings = self._originalRatings[['userId', 'itemId', 'rating', 'timestamp']]
            print(self._originalRatings)
            print('Range of userId is [{}, {}]'.format(self._originalRatings.userId.min(), self._originalRatings.userId.max()))
            print('Range of itemId is [{}, {}]'.format(self._originalRatings.itemId.min(), self._originalRatings.itemId.max()))
    
        def _binarize(self, ratings):
            """
            binarize data into 0 or 1 for implicit feedback
            """
            ratings = deepcopy(ratings)
            ratings['rating'][ratings['rating'] > 0] = 1.0
            self._preprocessRatings = ratings
            # print("binary: \n", self._preprocessRatings)
    
        def _select_Negatives(self, ratings):
            """
            Select al;l negative samples and 100 sampled negative items for each user.
            """
            # 构造user-item表
            interact_status = ratings.groupby('userId')['itemId'].apply(set).reset_index().rename(
                columns={'itemId': 'interacted_items'})
            print("interact_status: \n", interact_status)
    
            # 把与用户没有产生过交互的样本都当做是负样本
            interact_status['negative_items'] = interact_status['interacted_items'].apply(lambda x: self._itemPool - x)
    
            # 从上面的全部负样本中随机选99个出来
            interact_status['negative_samples'] = interact_status['negative_items'].apply(lambda x: random.sample(x, 99))
            print("after sampling interact_status: \n", interact_status)
    
            print("select and rearrange columns")
            self._negatives = interact_status[['userId', 'negative_items', 'negative_samples']]
    
        def _split_pool(self, ratings):
            """leave one out train/test split """
            print("sort by timestamp descend")
            # 先按照'userID'进行分组,然后根据时间戳降序排列
            ratings['rank_latest'] = ratings.groupby(['userId'])['timestamp'].rank(method='first', ascending=False)
            print(ratings)
    
            # 选取排名第一的数据作为测试集,也就是最新的那个数据
            test = ratings[ratings['rank_latest'] == 1]
            # 选取所有排名靠后的,也就是历史数据当做训练集
            train = ratings[ratings['rank_latest'] > 1]
            # print("test: \n", test)
            # print("train: \n", train)
    
            print("size of test {0}, size of train {1}".format(len(test), len(train)))
    
            # 确保训练集和测试集的userId是一样的
            assert train['userId'].nunique() == test['userId'].nunique()
    
            self.train_ratings = train[['userId', 'itemId', 'rating']]
            self.test_ratings = test[['userId', 'itemId', 'rating']]
    
        def sample_generator(self, num_negatives):
            # 合并之后的train_ratings的列包括['userId','itemId','rating','negative_items']
            train_ratings = pd.merge(self.train_ratings, self._negatives[['userId', 'negative_items']], on='userId')
            # 从用户的全部负样本集合中随机选择num_negatives个样本当做负样本,并产生一个新的名为"negatives"的列
            train_ratings['negatives'] = train_ratings['negative_items'].apply(lambda x: random.sample(x, num_negatives))
            print(train_ratings)
    
            # 构造模型所需要的数据,分别是输入user、items以及目标分值ratings。
            users, items, ratings = [], [], []
            for row in train_ratings.itertuples():
                # 构造正样本,分别是userId, itemId以及目标分值1
                users.append(int(row.userId))
                items.append(int(row.itemId))
                ratings.append(float(row.rating))
                # 为每个用户构造num_negatives个负样本,分别是userId, itemId以及目标分值0
                for i in range(num_negatives):
                    users.append(int(row.userId))
                    items.append(int(row.negatives[i]))
                    ratings.append(float(0)) # 负样本的ratings为0,直接强行设置为0
    
            return users, items, ratings
    
    

    NCF框架

    NCF框架是本文要实现的3个模型的主体结构,因此在代码设计的时候,可以将3个模型公共的部分放到这里,因此我将它定义为一个基类。它主要定义了Embedding层,全连接层,激活函数等。由于每一个模型的线性模型的输入是不一致的,故将其下放到子类中去定义。 NCF框架

    代码如下:

    class NCF(object):
        def __init__(self, config, latent_dim_gmf=8, latent_dim_mlp=8):
            self._config = config
            self._num_users = config['num_users']
            self._num_items = config['num_items']
            self._latent_dim_gmf = latent_dim_gmf
            self._latent_dim_mlp = latent_dim_mlp
    
            # 建立MLP模型的user Embedding层和item Embedding层,输入的向量长度分别为用户的数量,item的数量,输出都是隐式空间的维度latent dim
            self._embedding_user_mlp = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_mlp)
            self._embedding_item_mlp = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_mlp)
            # 建立GMP模型的user Embedding层和item Embedding层,输入的向量长度分别为用户的数量,item的数量,输出都是隐式空间的维度latent dim
            self._embedding_user_gmf = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_gmf)
            self._embedding_item_gmf = torch.nn.Embedding(num_embeddings=self._num_users, embedding_dim=self._latent_dim_gmf)
    
            # 全连接层
            self._fc_layers = torch.nn.ModuleList()
            for idx, (in_size, out_size) in enumerate(zip(config['layers'][:-1], config['layers'][1:])):
                self._fc_layers.append(torch.nn.Linear(in_size, out_size))
    
            # 激活函数
            self._logistic = nn.Sigmoid()
    
        @property
        def fc_layers(self):
            return self._fc_layers
    
        @property
        def embedding_user_gmf(self):
            return self._embedding_user_gmf
    
        @property
        def embedding_item_gmf(self):
            return self._embedding_item_gmf
    
        @property
        def embedding_user_mlp(self):
            return self._embedding_user_mlp
    
        @property
        def embedding_item_mlp(self):
            return self._embedding_item_mlp
    
        def saveModel(self):
            torch.save(self.state_dict(), self._config['model_name'])
    
        @abstractmethod
        def load_preTrained_weights(self):
            pass
    
    

    GMF模型

    GMF模型的结构如下图所示:

    GMF模型
    可以看到它主要包含user和item两个embedding层,一个线性模型外加一个Sigmoid激活函数,PyTorch中提供了现成的Embedding模块线性模块,以及Sigmoid模块。可以直接使用。它分别继承自NCF基类和torch.nn.Module。
    代码如下:
    class GMF(NCF, nn.Module):
        def __init__(self, config, latent_dim_gmf):
            nn.Module.__init__(self)
            NCF.__init__(self, config=config, latent_dim_gmf=latent_dim_gmf)
            # 创建一个线性模型,输入为潜在特征向量,输出向量长度为1
            self._affine_output = nn.Linear(in_features=self._latent_dim_gmf, out_features=1)
    
        @property
        def affine_output(self):
            return self._affine_output
    
        def forward(self, user_indices, item_indices):
            """
            前向传播
            :param user_indices: user Tensor
            :param item_indices: item Tensor
            :return: predicted rating
            """
            # 先将user和item转换为对应的Embedding表示,注意这个支持Tensor操作,即传入的是一个user列表,对其中每一个user都会执行Embedding操作,即都会使用Embedding表示
            user_embedding = self._embedding_user_gmf(user_indices)
            item_embedding = self._embedding_item_gmf(item_indices)
            # 对user_embedding和user_embedding进行逐元素相乘, 这一步其实就是MF算法的实现
            element_product = torch.mul(user_embedding, item_embedding)
            # 将逐元素的乘积的结果通过一个S型神经元
            logits = self._affine_output(element_product)
            rating = self._logistic(logits)
            return rating
    
        def load_preTrained_weights(self):
            pass
    

    MLP模型

    MLP与GMP一样,只需要定义自己的线性模型即可。 MLP模型

    代码如下:

    
    class MLP(NCF, nn.Module):
        def __init__(self, config, latent_dim_mlp):
            nn.Module.__init__(self)
            NCF.__init__(self, config=config, latent_dim_mlp=latent_dim_mlp)
            # 创建一个线性模型,输入为潜在特征向量,输出向量长度为1
            self._affine_output = torch.nn.Linear(in_features=config['layers'][-1], out_features=1)
    
        @property
        def affine_output(self):
            return self._affine_output
    
        def forward(self, user_indices, item_indices):
            """
            :param user_indices: user Tensor
            :param item_indices: item Tensor
            """
            # 先将user和item转换为对应的Embedding表示,注意这个支持Tensor操作,即传入的是一个user列表,
            # 对其中每一个user都会执行Embedding操作,即都会使用Embedding表示
            user_embedding = self._embedding_user_mlp(user_indices)
            item_embedding = self._embedding_item_mlp(item_indices)
            vector = torch.cat([user_embedding, item_embedding], dim=-1) # concat latent vector
            for idx, _ in enumerate(range(len(self._fc_layers))):
                vector = self._fc_layers[idx](vector)
                vector = torch.nn.ReLU()(vector)
                ##  Batch normalization
                # vector = torch.nn.BatchNorm1d()(vector)
                ## DroupOut layer
                # vector = torch.nn.Dropout(p=0.5)(vector)
            logits = self._affine_output(vector)
            rating = self._logistic(logits)
            return rating
    
        def load_preTrained_weights(self):
            config = self._config
            gmf_model = GMF(config, config['latent_dim_gmf'])
            if config['use_cuda'] is True:
                gmf_model.cuda()
            # 加载GMF模型参数到指定的GPU上
            state_dict = torch.load(self._config['pretrain_gmf'])
                                    #map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
                                    #map_location = {'cuda:0': 'cpu'})
            gmf_model.load_state_dict(state_dict, strict=False)
    
            self._embedding_item_mlp.weight.data = gmf_model.embedding_item_gmf.weight.data
            self._embedding_user_mlp.weight.data = gmf_model.embedding_user_gmf.weight.data
    

    NeuMF模型

    NeuMF模型是集成了前两个模型,因此前向传播部分会稍微复杂点,需要先将GMF模型和MLP模型的输出连接起来之后再输入到线性模型中去。 NeuMF模型

    代码如下:

    class NeuMF(NCF, nn.Module):
        def __init__(self, config, latent_dim_gmf, latent_dim_mlp):
            nn.Module.__init__(self)
            NCF.__init__(self, config, latent_dim_gmf, latent_dim_mlp)
    
            # 创建一个线性模型,输入为GMF模型和MLP模型的潜在特征向量长度之和,输出向量长度为1
            self._affine_output = torch.nn.Linear(in_features=config['layers'][-1] + config['latent_dim_gmf'], out_features=1)
    
        @property
        def affine_output(self):
            return self._affine_output
    
        def forward(self, user_indices, item_indices):
            user_embedding_mlp = self._embedding_user_mlp(user_indices)
            item_embedding_mlp = self._embedding_item_mlp(item_indices)
            user_embedding_gmf = self._embedding_user_gmf(user_indices)
            item_embedding_gmf = self._embedding_item_gmf(item_indices)
    
            # concat the two latent vector
            mlp_vector = torch.cat([user_embedding_mlp, item_embedding_mlp], dim=-1)
            # multiply the two latent vector
            gmf_vector = torch.mul(user_embedding_gmf, item_embedding_gmf)
    
            for idx, _ in enumerate(range(len(self._fc_layers))):
                mlp_vector = self._fc_layers[idx](mlp_vector)
                mlp_vector = torch.nn.ReLU()(mlp_vector)
    
            vector = torch.cat([mlp_vector, gmf_vector], dim=-1)
            logits = self._affine_output(vector)
            rating = self._logistic(logits)
            return rating
    
        def load_preTrained_weights(self):
            # 加载MLP模型参数
            mlp_model = MLP(self._config['mlp_config'], self._config['mlp_config']['latent_dim_mlp'])
            if self._config['use_cuda'] is True:
                mlp_model.cuda()
            state_dict = torch.load(self._config['pretrain_mlp'])
                                    # map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
                                    # map_location = {'cuda:0': 'cpu'})
            mlp_model.load_state_dict(state_dict, strict=False)
    
            self._embedding_item_mlp.weight.data = mlp_model.embedding_item_mlp.weight.data
            self._embedding_user_mlp.weight.data = mlp_model.embedding_user_mlp.weight.data
            for idx in range(len(self._fc_layers)):
                self._fc_layers[idx].weight.data = mlp_model.fc_layers[idx].weight.data
    
            # 加载GMF模型参数
            gmf_model = GMF(self._config['gmf_config'], self._config['gmf_config']['latent_dim_gmf'])
            if self._config['use_cuda'] is True:
                gmf_model.cuda()
            state_dict = torch.load(self._config['pretrain_gmf'])
                                    # map_location=lambda storage, loc: storage.cuda(device=self._config['device_id']))
                                    # map_location = {'cuda:0': 'cpu'})
            mlp_model.load_state_dict(state_dict, strict=False)
    
            self._embedding_item_gmf.weight.data = gmf_model.embedding_item_gmf.weight.data
            self._embedding_user_gmf.weight.data = gmf_model.embedding_user_gmf.weight.data
    
            self._affine_output.weight.data = self._config['alpha'] * torch.cat([mlp_model.affine_output.weight.data, gmf_model.affine_output.weight.data], dim=-1)
            self._affine_output.bias.data = self._config['alpha'] * (mlp_model.affine_output.bias.data + gmf_model.affine_output.bias.data)
    

    训练器

    单独定义了一个Trainer的类,用来训练不同的模型,这个类里面包含了优化器,损失函数的定义,以及前向传播、反向传播、参数更新、模型保存等操作。
    代码如下:

    class Trainer(object):
        def __init__(self, model, config):
            self._config = config
            self._model = model
            # 选择优化器
            self._optimizer = pick_optimizer(self._model, self._config)
            # 定义损失函数,对于隐反馈数据,这里使用交叉熵损失函数
            self._crit = torch.nn.BCELoss()
    
        def _train_single_batch(self, users, items, ratings):
            """
            对单个小批量数据进行训练
            :param users: user Tensor
            :param items: item Tensor
            :param ratings: rating Tensor
            :return:
            """
            if self._config['use_cuda'] is True:
                # 将这些数据由CPU迁移到GPU
                users, items, ratings = users.cuda(), items.cuda(), ratings.cuda()
    
            # 先将梯度清零,如果不清零,那么这个梯度就和上一个mini-batch有关
            self._optimizer.zero_grad()
            # 模型的输入users, items,调用forward进行前向传播
            ratings_pred = self._model(users, items)
            # 通过交叉熵损失函数来计算损失, ratings_pred.view(-1)代表将预测结果摊平,变成一维的结构。
            loss = self._crit(ratings_pred.view(-1), ratings)
            # 反向传播计算梯度
            loss.backward()
            # 梯度下降等优化器 更新参数
            self._optimizer.step()
            # 将loss的值提取成python的float类型
            loss = loss.item()
            return loss
    
        def _train_an_epoch(self, train_loader, epoch_id):
            """
            训练一个Epoch,即将训练集中的所有样本全部都过一遍
            :param train_loader: Torch的DataLoader
            :param epoch_id: 训练轮次Id
            :return:
            """
            # 告诉模型目前处于训练模式,启用dropout以及batch normalization
            self._model.train()
            total_loss = 0
            # 从DataLoader中获取小批量的id以及数据
            for batch_id, batch in enumerate(train_loader):
                assert isinstance(batch[0], torch.LongTensor)
                # 这里的user, item, rating大小变成了1024维了,因为batch_size是1024,即每次选取1024个样本数据进行训练
                user, item, rating = batch[0], batch[1], batch[2]
                rating = rating.float()
                loss = self._train_single_batch(user, item, rating)
                print('[Training Epoch {}] Batch {}, Loss {}'.format(epoch_id, batch_id, loss))
                total_loss += loss
            print('Training Epoch: {}, TotalLoss: {}'.format(epoch_id, total_loss))
    
        def train(self, sampleGenerator):
            # 是否使用GPU加速
            self.use_cuda()
            # 是否使用预先训练好的参数
            self.load_preTrained_weights()
    
            for epoch in range(self._config['num_epoch']):
                print('-' * 20 + ' Epoch {} starts '.format(epoch) + '-' * 20)
                # 每个轮次都重新随机产生样本数据集
                users, items, ratings = sampleGenerator(num_negatives=self._config['num_negative'])
                # 构造一个DataLoader
                data_loader = Construct_DataLoader(users=users, items=items, ratings=ratings,
                                                   batchsize=self._config['batch_size'])
                # 训练一个轮次
                self._train_an_epoch(data_loader, epoch_id=epoch)
    
        def use_cuda(self):
            if self._config['use_cuda'] is True:
                assert torch.cuda.is_available(), 'CUDA is not available'
                torch.cuda.set_device(self._config['device_id'])
                self._model.cuda()
    
        def load_preTrained_weights(self):
            if self._config['pretrain'] is True:
                self._model.load_preTrained_weights()
    
        def save(self):
            self._model.saveModel()
    
    

    测试代码

    主要是分为了训练和测试两个阶段,先训练模型,然后进行一些简单的测试。
    代码如下:

    
    if __name__ == "__main__":
        ####################################################################################
        # NCF 神经协同过滤算法
        ####################################################################################
    
        # 加载和预处理数据
        dp = DataProcess("../Data/ml-1m/ratings.dat")
    
        # 初始化GMP模型
        # config = gmf_config
        # model = GMF(config, config['latent_dim_gmf'])
    
        # # 初始化MLP模型
        # config = mlp_config
        # model = MLP(config, config['latent_dim_mlp'])
    
        # 初始化NeuMF模型
        config = neumf_config
        model = NeuMF(config, config['latent_dim_gmf'], config['latent_dim_mlp'])
    
        # ###############################################################
        # 模型训练阶段
        # ###############################################################
        trainer = Trainer(model=model, config=config)
        trainer.train(dp.sample_generator)
        trainer.save()
    
        # ###############################################################
        # 模型测试阶段
        # ###############################################################
    
        # 加载数据集
        dp = DataProcess("../Data/ml-1m/ratings.dat")
    
        config = neumf_config
        neumf = NeuMF(config, config['latent_dim_gmf'], config['latent_dim_mlp'])
        state_dict = torch.load("../Models/NCF_NeuMF.model", map_location=torch.device('cpu'))
        neumf.load_state_dict(state_dict, strict=False)
    
        print(neumf.forward(torch.LongTensor([1]), torch.LongTensor([1193])))
        print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([661])))
        print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([914])))
        print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([3408])))
    
        print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([1245])))
        print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([32])))
        print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([4])))
        print(neumf.forward(torch.LongTensor([1]),torch.LongTensor([62])))
    

    完整代码见https://github.com/HeartbreakSurvivor/RsAlgorithms

    参考

    相关文章

      网友评论

          本文标题:神经协同过滤NCF(二)之代码实战篇

          本文链接:https://www.haomeiwen.com/subject/qvojiktx.html