FM算法主要分三类
- SGD(随机梯度下降)
- ALS(交替最小二乘法)
- MCMC(马尔科夫链蒙特卡罗法)
ALS已经介绍过,pyspark中求解使用的是AdamW(默认)和梯度下降法
模型优点
可用于高度稀疏数据场景;具有线性的计算复杂度
算法介绍
模型公式
前2项为全局偏差和线性项,第三项为交互项
分类
from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("FMClassifierExample") \
.getOrCreate()
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a FM model.
# stepSize:每次迭代优化的步进
# factorSize:默认8.表示获取成对交互项使用的因子向量维度。个人认为是每个样本转为8维向量,基于此计算成对交互项(看公式应该是内积)
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)
# Create a Pipeline.
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])
# Train model.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# 多类分类评估器
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
# 对predictions中设定列执行评估
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % accuracy)
fmModel = model.stages[2]# 注意一下pipeline的模型阶段使用方式
print("Factors: " + str(fmModel.factors))# 表示计算交互项时各样本使用的因子向量
print("Linear: " + str(fmModel.linear))# 表示公式中线性项的系数w
print("Intercept: " + str(fmModel.intercept))# 表示全局偏差
回归
from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("FMRegressorExample") \
.getOrCreate()
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a FM model.
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)
# Create a Pipeline.
pipeline = Pipeline(stages=[featureScaler, fm])
# Train model.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors))
print("Linear: " + str(fmModel.linear))
print("Intercept: " + str(fmModel.intercept))
网友评论