在实际业务中搜索和推荐都背负着转化率的目标:点击转化、下单转化、支付转化等,搜索、推荐核心本质是想通的,核心目标都是在解决信息过剩的问题,底层框架也都是基于检索+排序实现的,不同在于一个是意图明确的主动行为,一个是浏览时的被动填充;检索即确定数据呈现的大致范围,排序将决定呈现给用户的先后顺序,两者同等重要,本节将重点围绕排序展开。
搜索&推荐CTR
如何选择高质量训练样本
-
根据模型要提升的业务目标确定数据范围。如提升点击率,那么正样本数据就是曝光且点击的数据,负样本则是曝光后未点击的行为数据;提升下单率(GMV),那么正样本将是点击未下单的数据,负样本同样也是以点击行为为前置条件未下单的数据。
-
点击行为数据的质量。曝光和点击是整个转化链路中最基层的行为数据,数据量比较大,用户误操作频率相对也比较高,如果单纯的将点击数据作为正样本进行训练那么模型将会存在一定的误差,如视频网站为了规避此类问题,一般会设置一个阈值,如观看时长,当时长超过某个阈值才认为是有效的点击数据,还有电商网站会设置详情页停留时长,或加入搜藏、分享等动作来判别点击行为的权重。
-
在一段时间范围内对商品数据去重。针对同一内容,不同的位置曝光多次,有的用户点了有的没点,几遍是同一位置的商品用户调整了搜索关键词后之前有过点击浏览的商品也不可能再次点击了,针对上述情况可保留一次会话内的点击数据,没点击的可以去除。(如果不做去重,也会影响后续模型离线测试的准确度)
-
样本数量均衡。正负样本数量不应偏差很大,否则会造成样本偏多一方在模型训练时过拟合,模型泛化能力大大下降;导致模型离线测试准去率很高,但auc很低。需要注意的是这里的均衡不是指的正负样本数量完全相等,正负比例1:2也是可以的,具体还要看准确率和auc的效果。
模型选择与训练
业界常用的CTR预估模型有LR和GBDT
-
LR 是最成熟、业界使用最广泛的模型,由于其简单、可解释、易大规模并行、线上预测速度快等优点被广泛应用。缺点:
-
GBDT 是非常经典的统计学习模型,是一种非参数学习模型,能较好的拟合非线性。其基于Boosting思想,迭代计算出一系列简单决策树,其中后一棵树用于拟合他前边所有树的残差。其优点是:学习能力强、可解释性好、能拟合数据中复杂的非线性模式、擅长处理连续型特征、相比LR/SVM降低了人工处理特征的工作量;缺点是:模型复杂度高、资源消耗严重。
-
GBDT+LR 这里将GBDT作为特征处理模块来使用,它的预测结果并不重要,GBDT的分支子树会作为LR模型的输入特征,通过GBDT处理后可将连续性特征离散化处理,也可以自动完成特征交叉,避免了LR模型训练时繁重的特征工程的工作。
通过spark实现GBDT+LR
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.mllib.classification.{ LogisticRegressionModel, LogisticRegressionWithLBFGS }
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.linalg.{ Vector => mlVector }
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.configuration.FeatureType._
import org.apache.spark.mllib.tree.model.{ GradientBoostedTreesModel, Node }
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.collection.mutable.ArrayBuffer
object GbdtLr {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().
master("local").
appName("GbdtLr").
getOrCreate()
import spark.implicits._
//1 参数准备
val iteratTree = 10
val iteratDepth = 10
val maxAuc = 0.0
val maxDepth = 15
val numTrees = 10
val minInstancesPerNode = 2
//2 训练样本准备
val dataPath = "hdfs://1.1.1.1:9000/user/data01/"
//2 训练样本准备
val (trainingData, testData) = readLibSvmSampleData(spark, dataPath)
trainingData.cache()
testData.cache()
println(s"trainingData.count(): ${trainingData.count()}")
println(s"testData.count(): ${testData.count()}")
println("trainingData.show")
trainingData.show
val data = trainingData.unionAll(testData)
//3 Gbdt模型训练
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
boostingStrategy.treeStrategy.minInstancesPerNode = minInstancesPerNode
boostingStrategy.numIterations = numTrees
boostingStrategy.treeStrategy.maxDepth = maxDepth
val gbdtModel = GradientBoostedTrees.train(trainingData.rdd, boostingStrategy)
//4 gbdt模型解析:取出所有树的叶子节点
val treeLeafMap = getTreeLeafMap(gbdtModel)
//5 样本数据转换成gbdt叶子节点编号的样本
val lrSampleLablePoint = lrSample(data.rdd, treeLeafMap, gbdtModel)
val lrSplits = lrSampleLablePoint.randomSplit(Array(0.7, 0.3))
val (lrTrainingData, lrTestData) = (lrSplits(0), lrSplits(1))
lrTrainingData.cache()
lrTrainingData.count()
lrTestData.cache()
lrTestData.count()
//6 lr模型训练
val lr = new LogisticRegressionWithLBFGS().setNumClasses(2)
lr.optimizer.setNumIterations(100)
lr.optimizer.setRegParam(0.0)
val lrModel = lr.run(lrTrainingData)
//7 计算模型指标
lrModel.clearThreshold()
val scoreAndLabels = lrTestData.map { point =>
val score = lrModel.predict(point.features)
(score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auc = metrics.areaUnderROC()
val aupr = metrics.areaUnderPR()
println(s"AUC: ${auc}")
println(s"AUPR: ${aupr}")
}
/**
* 读取libSVM格式的文件,生成训练样本和测试样本。
* 1)读取文件
* 2)生成标签索引
* 3)样本处理
* 4)样本划分
*/
def readLibSvmSampleData(
@transient spark: org.apache.spark.sql.SparkSession,
dataPath: String): (Dataset[LabeledPoint], Dataset[LabeledPoint]) = {
import spark.implicits._
// 2.1 读取样本
val dataRead = spark.read.options(Map(("delimiter", "|"), ("header", "false"))).csv(dataPath)
// 2.2 获取样本中所有标签,并且建立索引关系
val featureMap = dataRead.map {
case Row(libSvmFeatrue: String) =>
val items = libSvmFeatrue.split(' ')
val features = items.filter(_.nonEmpty).
filter(f => f.split(':').size == 2).
map { item =>
val indexAndValue = item.split(':')
indexAndValue(0)
}
features
}.flatMap(x => x).distinct().collect().sorted.zipWithIndex.toMap
val numFeatures = featureMap.size
// 2.3 样本校准化处理
val readSampleData = dataRead.map {
case Row(libSvmFeatrue: String) =>
val items = libSvmFeatrue.split(' ')
val click = items(0).toString().toDouble
val features = items.filter(_.nonEmpty).
filter(f => f.split(':').size == 2).
map { item =>
val indexAndValue = item.split(':')
val id = featureMap.getOrElse(indexAndValue(0), -1)
val value = indexAndValue(1).toDouble
(id, value)
}.filter(f => f._1 > 0).sortBy(f => f._1)
val label = if (click > 0) 1.0 else 0.0
LabeledPoint(label, Vectors.sparse(numFeatures, features.map(_._1), features.map(_._2)))
}
// 2.3 划分样本
val splits = readSampleData.randomSplit(Array(0.6, 0.4))
val training = splits(0)
val test = splits(1)
(training, test)
}
/**
* 根据gbdt模型对样本进行转换生成新样本
* 每个样本通过每一棵树,可以找到对应的叶节点,该叶节点就是转换后的新特征。
* @param sampleLablePoint 训练样本,格式为:RDD[LabeledPoint].
* @param treeLeafMap gbdt模型的叶子节点.
* @param gbdtModel gbdt模型
* @return RDD[LabeledPoint]
*/
def lrSample(
sampleLablePoint: RDD[LabeledPoint],
lrFeatureMap: Map[String, Int],
gbdtModel: GradientBoostedTreesModel): RDD[LabeledPoint] = {
val treeNumber = gbdtModel.trees.length
val lrFeatureNum = lrFeatureMap.size
val lrSampleParsed = sampleLablePoint.map { point =>
val label = point.label
val features = point.features
val lrFeatures = ArrayBuffer[Int]()
val lrValues = ArrayBuffer[Double]()
val treeNumber = gbdtModel.trees.size
for (treeIndex <- 0 to (treeNumber - 1)) {
var node = gbdtModel.trees(treeIndex).topNode
while (!node.isLeaf) {
if (node.split.get.featureType == Continuous) {
if (features(node.split.get.feature) <= node.split.get.threshold)
node = node.leftNode.get
else
node = node.rightNode.get
} else {
if (node.split.get.categories.contains(features(node.split.get.feature)))
node = node.leftNode.get
else
node = node.rightNode.get
}
}
val key = treeIndex.toString + '_' + node.id
lrFeatures += lrFeatureMap(key)
lrValues += 1
}
(label, lrFeatures.sorted.toArray, lrValues.toArray)
}
val lrSamplLablePoint = lrSampleParsed.map {
case (label, lrFeatures, lrValues) =>
LabeledPoint(label, Vectors.sparse(lrFeatureNum, lrFeatures, lrValues))
}
(lrSamplLablePoint)
}
/**
* gbdt模型解析叶子节点
* @param gbdtModel gbdt模型.
* @return 返回Map[String, Int],得到所有决策树的叶子节点,以及编号,数据格式为:(树id_叶子节点id, 编号)
*/
def getTreeLeafMap(gbdtModel: GradientBoostedTreesModel): Map[String, Int] = {
val lrFeatureMap = scala.collection.mutable.Map[String, Int]()
var featureId = 0
val treeNumber = gbdtModel.trees.size
for (treeIndex <- 0 to (treeNumber - 1)) {
val treeNodeQueue = collection.mutable.Queue[Node]()
val rootNode = gbdtModel.trees(treeIndex).topNode
treeNodeQueue.enqueue(rootNode)
while (!treeNodeQueue.isEmpty) {
val resNode = treeNodeQueue.dequeue()
if (resNode.isLeaf) {
val key = treeIndex.toString + '_' + resNode.id.toString()
lrFeatureMap(key) = featureId
featureId = featureId + 1
}
if (resNode.leftNode.isDefined)
treeNodeQueue.enqueue(resNode.leftNode.get)
if (resNode.rightNode.isDefined)
treeNodeQueue.enqueue(resNode.rightNode.get)
}
}
(lrFeatureMap.toMap)
}
}
如何进行模型离线测试
- 划分数据集
常用的划分方式有两种,随机划分和按时间划分,其中按时间划分,可将最近两天数据做为测试集,其余数据作为训练集。
网友评论