1.Spark机器学习基础——特征工程
1.1对连续值处理
1.1.1 binarizer / 二值化
from __futrue__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Binarizer
sprak = SparkSession . builder .appName('BinarizerExample').getOrCreate()
continuousDataFrame = spark.creatDataFrame([(0,1.1) , (1,8.5) , (2,5.2)] , ["id" , 'feature'])
binarizer = Binarizer(threshold = 5.1 , inputCol = 'feature' , outputCol = 'binarized_feature')
binarizedDataFrame = binarizer.tranform(continuesDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
spark.stop()

1
1.1.2 按照给定边界离散化
from __future__import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer
spark = SparkSession . builder.appName('BucketizerExample').getOrCreate()
splits = [-float('inf') , -0.5 , 0.0 , 0.5 , float('inf')]
data = [(-999.9 , ) , (-0.5 , ) , (-0.3,) , (0.0,) , (0.2,) , (999.9,)]
dataFrame = spark.createDataFrame(data , ['features'])
bucketizer = Bucketizer(splits = splits , inputCol = 'features' , outputCol = 'bucketFeatures')
#按照给定的边界进行分桶
bucketedData = bucketizer.transform(dataFrame)
print('Bucketizer output with %d buckets ' %(len(bucketizer.getSplits()) -1))
bucketedData.show()
spark.stop()

2
1.1.3 quantile_discretizer/按分位数离散化
from __future__ import print_function
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import SparkSession
spark = SparkSession.builder .appName('QuantileDiscretizerExample').getOrCreate()
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data, ["id", "hour"])
df = df.repartition(1)
# 分成3个桶进行离散化
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()
spark.stop()

3
1.1.4 最大绝对值幅度缩放
from __future__ import print_function
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MaxAbsScalerExample').getOrCreate()
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# 计算最大绝对值用于缩放
scalerModel = scaler.fit(dataFrame)
# 缩放幅度到[-1, 1]之间
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
spark.stop()

4
1.1.5标准化
from __future__ import print_function
from pyspark.ml.feature import StandardScaler
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("StandardScalerExample")\
.getOrCreate()
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# 计算均值方差等参数
scalerModel = scaler.fit(dataFrame)
# 标准化
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
spark.stop()

5
from __future__ import print_function
from pyspark.ml.feature import StandardScaler
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("StandardScalerExample")\
.getOrCreate()
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
# 计算均值方差等参数
scalerModel = scaler.fit(dataFrame)
# 标准化
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
spark.stop()

6
1.1.6 添加多项式特征
from __future__ import print_function
from pyspark.ml.feature import PolynomiaExpansion
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PolynomialExpansionExample")\
.getOrCreate()
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)
spark.stop()

7
1.2对离散值处理
1.2.1 独热向量编码
from __future__ import print_function
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("OneHotEncoderExample")\
.getOrCreate()
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
spark.stop()

8
1.3 对文本型处理
1.3.1 去停用词
from __future__ import print__function
from pyspark.ml.feature import StopWordRemove
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("StopWordsRemoverExample")\
.getOrCreate()
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
spark.stop()

9
1.3.2 Tokenizer
from __future__ import print_function
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("TokenizerExample")\
.getOrCreate()
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
spark.stop()

10
1.3.3 count_vectorizer
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer
spark = SparkSession\
.builder\
.appName("CountVectorizerExample")\
.getOrCreate()
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
spark.stop()

11
1.3.4 TF-IDF权重
from __future__ import print_function
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("TfIdfExample")\
.getOrCreate()
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
spark.stop()

12
1.3.5 n-gram语言模型
from __future__ import print_function
from pyspark.ml.feature import NGram
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("NGramExample")\
.getOrCreate()
#Hanmeimei loves LiLei
#LiLei loves Hanmeimei
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
spark.stop()

13
1.4高级变换
1.4.1SQL变换
from __future__ import print_function
from pyspark.ml.feature import SQLTransformer
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("SQLTransformerExample")\
.getOrCreate()
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
spark.stop()

14
1.4.2 R公式变换
from __future__ import print_function
from pyspark.ml.feature import RFormula
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("RFormulaExample")\
.getOrCreate()
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
spark.stop()

15
网友评论