简介
本文要介绍的是由上海交通大学的研究人员提出的PNN(Product-based Neural Networks)模型,该模型包含一个embedding层来学习类别数据的分布式表示,此外还包含product层来捕获字段之间的特征交互模式,最后包含一个全连接层去挖掘更高阶的特征交互。相比Deep Crossing模型,PNN模型在输入、EMbedding层、多层神经网络、以及最终的输出层并没有什么结构上的不同,唯一的区别在于PNN引入了Product(乘积)层来代替了Deep Crossing中的Stack层,即不同特征的embedding向量不再是简单地拼接在一起,而是使用Product操作来进行两两交互,更有针对性地获取特征之间的交叉信息。
关于Deep Crossing模型可参见推荐系统之Deep Crossing模型原理以及代码实践。
PNN模型出自论文《Product-based Neural Networks for User Response Prediction》。
模型简介
先看一下整体的模型结构图: PNN模型从自顶向下的视角对模型结构逐步分析:
输出层
PNN模型的输出是一个实数,代表点击率,计算方式如下:
其中,是输出层的参数。是第二个隐层的输出,代表Sigmoid函数,代表第个隐层的维度。L2隐层
第二个隐层的输出由以下公式计算得到:
其中是第一个隐层的输出,是线性整流单元,定义为。L1隐层
第一个隐层L1的输出由以下公式计算得到:
其中是对特征向量的线性操作得到的输出,是对特征向量进行乘积操作得到的输出,是偏置项。Product层
PNN模型对深度学习结构的创新主要体现在Product层的引入,Product层由和组成,下面详细介绍下它们的计算方式。首先定义向量的内积操作:
内积操作 首先对进行逐元素相乘,紧接着再累加起来形成一个标量。在此之后,通过和来分别计算和: 其中和是Product层的权重参数,它们的形状分别取决于和。作者在PNN模型中引入了一个常量信号“1”,即PNN模型图中的红色矩形框所示,通过引入这个常量信号,是的Product层不仅可以捕捉非线性信号,也可以保持线性信号,具体地,的定义如下: 其中是第个特征的嵌入向量。代表的是两两特征的内积交互操作。对于来说,通过观察上式可以发现,公式使用了, 其实就等于嵌入特征向量。
对于来说,论文中提出了两种乘积操作,分别是内积操作和外积操作。使用内积操作的PNN模型也被称之为IPNN(Inner Product-base Neural Network),使用外积操作的PNN模型也被称之为OPNN(Outer Product-base Neural Network)。
IPNN
首先定义向量的内积操作:
内积操作可以可视化如下: 图自https://www.jianshu.com/p/be784ab4abc2 由于有常量信号"1"的存在,线性部分的计算方式为: 非线性部分的计算方式为: 的计算结果是一个数,其计算复杂度为,是的维度,计算需要的时间复杂度,因为一共要进行次内积运算。再由得到的时间复杂度为。因此对于IPNN来说,总的时间复杂度为,其中分别是网络的超参数。这个时间复杂度对于实际应用来说显然过高了,因此论文提出使用矩阵分解的方式来降低复杂度。其中要注意都是对称矩阵,所以可以使用一阶矩阵分解。假设。将原来参数数量为的矩阵,分解为参数为的向量,则:
其中,为: 从而得到如下: 此时的时间复杂度降为。
OPNN
将特征交叉的方式由内积变为外积,则可得到OPNN的形式。外积的示意图如下:
图自https://www.jianshu.com/p/be784ab4abc2定义,则有: image.png 此时的为的矩阵,而是的矩阵,因此的计算时间复杂度为,的计算时间复杂度为。同样为了降低时空复杂度,论文使用了叠加的思想,重新定义了矩阵: 此时的计算时间复杂度变为了。
代码实践
模型部分包含了InnerProduct、OutterProduct、以及PNN模型,代码如下:
import torch
import torch.nn as nn
from BaseModel.basemodel import BaseModel
class InnerProduct(nn.Module):
"""InnerProduct Layer used in PNN that compute the element-wise
product or inner product between feature vectors.
Input shape
- a list of 3D tensor with shape: ``(batch_size,1,embedding_size)``.
Output shape
- 3D tensor with shape: ``(batch_size, N*(N-1)/2 ,1)`` if use reduce_sum. or 3D tensor with shape:
``(batch_size, N*(N-1)/2, embedding_size )`` if not use reduce_sum.
Arguments
- **reduce_sum**: bool. Whether return inner product or element-wise product
"""
def __init__(self, reduce_sum=True):
super(InnerProduct, self).__init__()
self.reduce_sum = reduce_sum
def forward(self, inputs):
embed_list = inputs
row,col = [], []
num_inputs = len(embed_list)
# 这里为了形成n(n-1)/2个下标的组合
for i in range(num_inputs - 1):
for j in range(i + 1, num_inputs):
row.append(i)
col.append(j)
p = torch.cat([embed_list[idx] for idx in row], dim=1) # batch num_pairs k
q = torch.cat([embed_list[idx] for idx in col], dim=1)
# inner_product 中包含了 n(n-1)/2 个 embedding size大小的向量,为了减少计算复杂度,将最后的维度求和,即将embedding size大小变为1
inner_product = p * q
if self.reduce_sum:
# 默认打开,将最后一维的数据累加起来,降低计算复杂度
inner_product = torch.sum(inner_product, dim=2, keepdim=True)
return inner_product
class OutterProduct(nn.Module):
"""
Input shape
- A list of N 3D tensor with shape: ``(batch_size,1,embedding_size)``.
Output shape
- 2D tensor with shape:``(batch_size,N*(N-1)/2 )``.
Arguments
- **filed_size** : Positive integer, number of feature groups.
- **kernel_type**: str. The kernel weight matrix type to use,can be mat,vec or num
"""
def __init__(self, field_size, embedding_size, kernel_type='mat'):
super(OutterProduct, self).__init__()
self.kernel_type = kernel_type
num_inputs = field_size
num_pairs = int(num_inputs * (num_inputs - 1) / 2)
embed_size = embedding_size
if self.kernel_type == 'mat':
self.kernel = nn.Parameter(torch.Tensor(embed_size, num_pairs, embed_size))
elif self.kernel_type == 'vec':
self.kernel = nn.Parameter(torch.Tensor(num_pairs, embed_size))
elif self.kernel_type == 'num':
self.kernel = nn.Parameter(torch.Tensor(num_pairs, 1))
nn.init.xavier_uniform_(self.kernel)
def forward(self, inputs):
embed_list = inputs
row = []
col = []
num_inputs = len(embed_list)
for i in range(num_inputs - 1):
for j in range(i + 1, num_inputs):
row.append(i)
col.append(j)
p = torch.cat([embed_list[idx] for idx in row], dim=1) # batch num_pairs k
q = torch.cat([embed_list[idx] for idx in col], dim=1)
# -------------------------
if self.kernel_type == 'mat':
p.unsqueeze_(dim=1)
# k k* pair* k
# batch * pair
kp = torch.sum(
# batch * pair * k
torch.mul(
# batch * pair * k
torch.transpose(
# batch * k * pair
torch.sum(
# batch * k * pair * k
torch.mul(p, self.kernel), dim=-1), 2, 1),
q),
dim=-1)
else:
# 1 * pair * (k or 1)
k = torch.unsqueeze(self.kernel, 0)
# batch * pair
kp = torch.sum(p * q * k, dim=-1)
# p q # b * p * k
return kp
class PNN(BaseModel):
def __init__(self, config, dense_features_cols, sparse_features_cols):
super(PNN, self).__init__(config)
# 稠密和稀疏特征的数量
self._num_of_dense_feature = dense_features_cols.__len__()
self._num_of_sparse_feature = sparse_features_cols.__len__()
# create embedding layers for all the sparse features
self.embedding_layers = nn.ModuleList([
# 根据稀疏特征的个数创建对应个数的Embedding层,Embedding输入大小是稀疏特征的类别总数,输出稠密向量的维度由config文件配置
nn.Embedding(num_embeddings=sparse_features_cols[idx], embedding_dim=config['embed_dim']) for idx in range(self._num_of_sparse_feature)
])
self.use_inner = config['use_inner']
self.use_outter = config['use_outter']
self.kernel_type = config['kernel_type']
if self.kernel_type not in ['mat', 'vec', 'num']:
raise ValueError("kernel_type must be mat,vec or num")
num_inputs = self._num_of_sparse_feature
# 计算两两特征交互的总数
num_pairs = int(num_inputs * (num_inputs - 1) / 2)
if self.use_inner:
self.innerproduct = InnerProduct()
if self.use_outter:
self.outterproduct = OutterProduct(num_inputs, config['embed_dim'], kernel_type=config['kernel_type'])
# 计算L1全连接层的输入维度
if self.use_outter and self.use_inner:
product_out_dim = 2*num_pairs + self._num_of_dense_feature + config['embed_dim'] * self._num_of_sparse_feature
elif self.use_inner or self.use_outter:
product_out_dim = num_pairs + self._num_of_dense_feature + config['embed_dim'] * self._num_of_sparse_feature
else:
raise Exception("you must specify at least one product operation!")
self.L1 = nn.Sequential(
nn.Linear(in_features=product_out_dim, out_features=config['L2_dim']),
nn.ReLU()
)
self.L2 = nn.Sequential(
nn.Linear(in_features=config['L2_dim'], out_features=1, bias=False),
nn.Sigmoid()
)
def forward(self, x):
# 先区分出稀疏特征和稠密特征,这里是按照列来划分的,即所有的行都要进行筛选
dense_input, sparse_inputs = x[:, :self._num_of_dense_feature], x[:, self._num_of_dense_feature:]
sparse_inputs = sparse_inputs.long()
# 求出稀疏特征的隐向量
sparse_embeds = [self.embedding_layers[i](sparse_inputs[:, i]) for i in range(sparse_inputs.shape[1])]
# 线性信号lz
linear_signal = torch.cat(sparse_embeds, axis=-1)
sparse_embeds = [e.reshape(e.shape[0], 1, -1) for e in sparse_embeds]
if self.use_inner:
inner_product = torch.flatten(self.innerproduct(sparse_embeds), start_dim=1)
product_layer = torch.cat([linear_signal, inner_product], dim=1)
if self.use_outter:
outer_product = self.outterproduct(sparse_embeds)
product_layer = torch.cat([linear_signal, outer_product], dim=1)
if self.use_outter and self.use_inner:
product_layer = torch.cat([linear_signal, inner_product, outer_product], dim=1)
# 将dense特征和sparse特征聚合起来
dnn_input = torch.cat([product_layer, dense_input], axis=-1)
output = self.L1(dnn_input)
output = self.L2(output)
return output
上述代码实现的模型与论文中有些许差异,主要在L1层。实际上,PNN模型在经过对特征的线性和乘积操作之后,并没有结果直接送到上层的L1全连接层,而是在乘积层内部又进行了局部全连接层的转换,分别将线性部分,乘积部分映射成了维的输入向量和,这里的是一个超参数,即L1隐层的输入维度。论文是首先将和相加之和,再送入隐层。这部分操作不具备创新性,并且可以被其他转换操作完全代替。因此为了代码实现简单,上述代码是直接将聚合起来,直接送入了层,这其实也并不影响我们理解论文的思想。
测试部分代码:
import torch
from DeepCross.trainer import Trainer
from PNN.network import PNN
from Utils.criteo_loader import getTestData, getTrainData
import torch.utils.data as Data
pnn_config = \
{
'L2_dim': 256, # 设置L2隐层的输入维度
'embed_dim': 8,
'kernel_type': 'mat',
'use_inner': True,
'use_outter': False,
'num_epoch': 25,
'batch_size': 32,
'lr': 1e-3,
'l2_regularization': 1e-4,
'device_id': 0,
'use_cuda': False,
'train_file': '../Data/criteo/processed_data/train_set.csv',
'fea_file': '../Data/criteo/processed_data/fea_col.npy',
'validate_file': '../Data/criteo/processed_data/val_set.csv',
'test_file': '../Data/criteo/processed_data/test_set.csv',
'model_name': '../TrainedModels/pnn.model'
}
if __name__ == "__main__":
####################################################################################
# PNN 模型
####################################################################################
training_data, training_label, dense_features_col, sparse_features_col = getTrainData(pnn_config['train_file'], pnn_config['fea_file'])
train_dataset = Data.TensorDataset(torch.tensor(training_data).float(), torch.tensor(training_label).float())
test_data = getTestData(pnn_config['test_file'])
test_dataset = Data.TensorDataset(torch.tensor(test_data).float())
pnn = PNN(pnn_config, dense_features_cols=dense_features_col, sparse_features_cols=sparse_features_col)
####################################################################################
# 模型训练阶段
####################################################################################
# # 实例化模型训练器
trainer = Trainer(model=pnn, config=pnn_config)
# 训练
trainer.train(train_dataset)
# 保存模型
trainer.save()
####################################################################################
# 模型测试阶段
####################################################################################
pnn.eval()
if pnn_config['use_cuda']:
pnn.loadModel(map_location=lambda storage, loc: storage.cuda(pnn_config['device_id']))
pnn = pnn.cuda()
else:
pnn.loadModel(map_location=torch.device('cpu'))
y_pred_probs = pnn(torch.tensor(test_data).float())
y_pred = torch.where(y_pred_probs>0.5, torch.ones_like(y_pred_probs), torch.zeros_like(y_pred_probs))
print("Test Data CTR Predict...\n ", y_pred.view(-1))
使用了criteo数据集的一个很小的子集进行训练和测试,输出是点击率预测,判断点击率大于0.5的就认为用户会点击,否则不点击。以下是部分结果,其中’0‘代表预测用户不点击,’1‘代表预测用户点击。
完整代码见:https://github.com/HeartbreakSurvivor/RsAlgorithms/tree/main/PNN。
网友评论