Spark机器学习基础——无监督学习
1.1 K-means
from __future__ import print_function
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql import SparkSession
!head -5 data/mllib/sapmle_kmeans_data.txt
1
spark = SparkSession\
.builder\
.appName("KMeansExample")\
.getOrCreate()
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
#训练K-means聚类模型
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
#预测(即分配聚类中心)
predictions = model.transform(dataset)
#根据Silhousette得分评估
evaluator = ClusteringEvaluator()
silhousette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
#输出预测结果
print("predicted Center: ")
for center in predictions[['prediction']].collect():
print(center.asDict())
# 聚类中心
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
spark.stop()
2
1.2GMM模型
from __future__ import print_function
from pyspark.ml.clustering import GaussianMixture
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("GaussianMixtureExample")\
.getOrCreate()
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
gmm = GaussianMixture().setK(2).setSeed(0)
model = gmm.fit(dataset)
print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
spark.stop()
3
1.3关联规则
!head -5 data/mllib/sample_fpgrowth.txt
4
from pyspark.mllib.fpm import FPGrowth
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("FPGrowthExample")\
.getOrCreate()
data = spark.sparkContext.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
print(fi)
spark.stop()
5
from pyspark.ml.fpm import FPGrowth
spark = SparkSession\
.builder\
.appName("FPGrowthExample")\
.getOrCreate()
df = spark.createDataFrame([
(0, [1, 2, 5]),
(1, [1, 2, 3, 5]),
(2, [1, 2])
], ["id", "items"])
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)
# Display frequent itemsets.
model.freqItemsets.show()
# Display generated association rules.
model.associationRules.show()
# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df).show()
spark.stop()
6
1.4LDA主体模型
from __future__ import print_function
from pyspark.ml.clustering import LDA
from pyspark.sql import SparkSession
! head -5 data/mllib/sample_lda_libsvm_data.txt
7
spark = SparkSession \
.builder \
.appName("LDAExample") \
.getOrCreate()
# 加载数据
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")
# 训练LDA模型
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp)+"\n")
# 输出主题
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)
# 数据集解析
print("transform dataset:\n")
transformed = model.transform(dataset)
transformed.show(truncate=False)
spark.stop()
8
9
1.5PCA降维
from __future__ import print_function
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
# 构建一份fake data
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
# PCA降维
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
spark.stop()
10
1.6 word2vec词嵌入
from __future__ import print_function
from pyspark.ml.feature import Word2Vec
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("Word2VecExample")\
.getOrCreate()
# 输入是bag of words形式
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# 设置窗口长度等参数,词嵌入学习
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
# 输出词和词向量
model.getVectors().show()
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
spark.stop()
11
网友评论