与Spark_Core建立链接
Spark_Core对Maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
访问HDFS集群
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>your-hdfs-version</version>
</dependency>
在程序中导包
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
初始化Spark_Core
创建链接
// SparkConf Spark基础配置对象
// setMaster Spark运行环境
// setAppName 建立名称
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("wordCount")
// SparkContext 与Spark框架建立链接
val sc: SparkContext = new SparkContext(sparkConf);
关闭链接
// 关闭链接
sc.stop()
命令行中运行SparkJAR
bin/spark-submit
# 主类
--class org.apache.spark.examples.SparkPi
# 环境
--master local[2]
# jar包位置
./examples/jars/sparl-examples_2.12-3.0.0.jar
# 参数
10
弹性分布式数据集 (RDD)
并行集合
val array: Array[Int] = Array(1, 2, 3, 4, 5)
val rdd: RDD[Int] = sc.makeRDD(array) // 个人写法
val rdd: RDD[Int] = sc.parallelize(array) // 文档标准写法
rdd.collect().foreach(println)
外部数据集
// 从 src/main/resources/full_gitee.csv 中读取; 支持通配符写法 如src/main/resources/*
val lines: RDD[String] = sc.textFile("src/main/r9esources/full_gitee.csv")
lines.collect().foreach(println)
RDD 操作
// 读取data.txt
val lines:RDD[String] = sc.textFile("src/main/resources/full_gitee.csv")
// 获取 每一行的长度
val lineLengths: RDD[Int] = lines.map(s => s.length)
// 用reduce累计
val totalLength: Int = lineLengths.reduce((a, b) => a + b)
print(totalLength)
常用RDD算子
map 并传递执行所需的表达式
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD: RDD[Int] = rdd.map((_: Int)*2)
mapRDD.collect().foreach(println)
flatMap 扁平化处理
val rdd: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)), 2)
val flatMapRDD: RDD[Any] = rdd.flatMap {
case list: List[_] => list
case d => List(d)
}
flatMapRDD.collect().foreach(println)
mapPartitions 以分区为单位进行数据转换操作
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// mapPartitions 可以以分区为单位进行数据转换操作
// 但是会将整个分区的数据加载到内存进行引用
// 如果处理完的数据是不会被释放掉,存在对象应勇
// 在内存较小,数据量较大的场合下,容易出现内存溢出
val mapRDD: RDD[Int] = rdd.mapPartitions(
(iter: Iterator[Int]) => {
println(">>>> ")
iter.map(_ * 2)
}
)
mapRDD.collect().foreach(println)
mapPartitionsWithIndex 以分区为单位进行过滤计算
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD: RDD[Int] = rdd.mapPartitionsWithIndex(
// index 为分区索引
(index: Int, iter: Iterator[Int]) => {
if (index == 1){
iter
}else{
Nil.iterator
}
}
)
mapRDD.collect().foreach(println)
glom 将同一个分区里的元素合并到一个array里
val rdd: RDD[Any] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD: RDD[Array[Any]] = rdd.glom()
glomRDD.collect().foreach(data => println(data.mkString(",")))
saveAsTextFile 保存到文件
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD: RDD[Int] = rdd.map(_ * 2)
mapRDD.saveAsTextFile("src/main/resources/output")
groupBy 分组
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val groupByRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
groupByRDD.collect().foreach(println)
filter 检索过滤
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)
filterRDD.collect().foreach(println)
sample 取样
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)
val str: String = rdd.sample(
// 第一个参数表示 抽取数据后是否将数据放回 true(放回), false(丢弃)
false,
// 第二个参数表示 数据源中每条数据被抽取的概率
0.4
// 抽取数据 随机算法的种子
// 1
).collect().mkString(",")
println(str)
distinct 去重
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4, 9, 10), 2)
val rdd1: RDD[Int] = rdd.distinct()
rdd1.collect().foreach(println)
coalesce 减少 分区
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4, 9, 10), 4)
val newRDD: RDD[Int] = rdd.coalesce(2)
newRDD.saveAsTextFile("src/main/resources/output")
repartition 增加 分区
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4, 9, 10), 2)
val newRDD: RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("src/main/resources/output")
sortBy 排序
val rdd: RDD[Int] = sc.makeRDD(List(2, 4, 1, 3), 2)
val newRDD: RDD[Int] = rdd.sortBy(num => num)
newRDD.saveAsTextFile("src/main/resources/output")
count 计算数据集长度
val rdd: RDD[Any] = sc.makeRDD(List(1, 2, 3, 4), 2)
rdd.collect()
val len: Long = rdd.count()
print(len)
sortByKey 按键排序
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("C", 1), ("B", 1), ("A", 1)), 2)
val sortByKeyrdd: RDD[(String, Int)] = rdd.sortByKey()
sortByKeyrdd.collect().foreach(println)
groupByKey 按键分组
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("C", 1), ("B", 1), ("A", 1)), 2)
val groupByKeyRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupByKeyRDD.collect().foreach(println)
reduceByKey 按键分组 并执行 reduce 操作
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("C", 1), ("B", 1), ("A", 1)), 2)
val reduceByKeyRDD: RDD[(String, Int)] = rdd.reduceByKey((a, b) => (a + b))
reduceByKeyRDD.collect().foreach(println)
与Spark_SQL建立链接
Spark_Core对Maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
在程序中导包
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
初始化Spark_SQL
创建链接
// 创建SparkSQL运行环境
val sparkconf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("snakeSparkSQL")
.set("spark.sql.warehouse.dir","file:///")
val spark: SparkSession = new SparkSession.Builder().config(sparkconf).getOrCreate()
关闭链接
// 关闭链接
spark.stop()
SQL
数据源
{"name": "yr", "age": 18}
{"name": "yr1", "age": 14}
{"name": "yr2", "age": 13}
{"name": "yr3", "age": 2}
{"name": "yr4", "age": 2}
{"name": "yr5", "age": 5}
从数据源读取数据
// 将数据读取为 DataFrame 数据源可以是 CSV format jdbc json load option options orc schema table text textFile
val df: DataFrame = spark.read.json("src/main/resources/data.json")
// df.show()
将DataFrame转为临时表
df.createOrReplaceTempView("dataTable")
对临时表使用SQL语句
spark.sql("select * from dataTable").show()
spark.sql("select name,age from dataTable").show()
spark.sql("select avg(age) from dataTable").show()
直接对DataFrame使用DSL
df.select("name", "age").show()
// 在使用 DataFrame 时, 如果涉及到转换操作, 需要引入转换规则
// 这里引入的是 val spark: SparkSession 中的隐式转换规则
import spark.implicits._
df.select($"age" + 1).show()
DataSet
// DataFrame 其实是特定泛型的DataSet 所以DataFrame所有方法 DataSet 均可以使用
val seq: Seq[Int] = Seq(1, 2, 3, 4)
val ds: Dataset[Int] = seq.toDS()
ds.show()
RDD <=> DataFrame
// 创建RDD
val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "yr1", 11), (2, "yr2", 12), (3, "yr3", 13), (4, "yr4", 14)))
// RDD 转换为 DataFrame
val df: DataFrame = rdd.toDF("id", "name", "age")
// DataFrame 转换为 RDD
val rdd1: RDD[Row] = df.rdd
DataSet <=> DataFrame
// DF 转换为 DS
val ds: Dataset[User] = df.as[User]
// DS 转换为 DF
val newdf: DataFrame = ds.toDF()
RDD <=> DataSet
// RDD => DS
val newds: Dataset[User] = rdd.map {
case (id, name, age) =>
User(id, name, age)
}.toDS()
// DS => RDD
val rdd2: RDD[User] = newds.rdd
自定义UDF函数的使用
spark.udf.register("prefixName", (name: String) =>{
"name" + name
})
spark.sql("select prefixName(name), age from user").show()
完整示例 自定义聚合函数类: 计算年龄平均值
弱类型转换方法
object mySPKsql {
def main(args: Array[String]): Unit = {
val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("snakeSparkSQL").set("spark.sql.warehouse.dir","file:///")
val spark: SparkSession = new SparkSession.Builder().config(sparkconf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("src/main/resources/data.json")
df.createOrReplaceTempView("user")
spark.udf.register("ageAVG", new MyAvgUDAF)
spark.sql("select ageAVG(age) from user").show()
spark.stop()
}
/**
* 自定义聚合函数类: 计算年龄平均值
*/
class MyAvgUDAF extends UserDefinedAggregateFunction{
// 输入数据的结构
override def inputSchema: StructType = {
StructType(
Array(
StructField("age", LongType)
)
)
}
// 缓冲区数据结构
override def bufferSchema: StructType = {
StructType(
Array(
StructField("total", LongType),
StructField("count", LongType)
)
)
}
// 函数就算结果的数据类型
override def dataType: DataType = LongType
// 函数的稳定性
override def deterministic: Boolean = true
// 缓存区初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// buffer(0) = 0L
// buffer(1) = 0L
buffer.update(0, 0L)
buffer.update(1, 0L)
}
// 根据输入的值来更新缓存区
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer.update(0, buffer.getLong(0) + input.getLong(0))
buffer.update(1, buffer.getLong(1) + 1)
}
// 缓冲区数据合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
}
// 就算平均值
override def evaluate(buffer: Row): Any = {
buffer.getLong(0)/buffer.getLong(1)
}
}
}
强类型转换方法
>=Spark 3.0
通用读取数据
spark.read.format("json").load("data.json")
不创建临时表 执行SQL
spark.sql("select * from json.`data.json`").show
通用保存数据
df.write.format("json").save("output")
保存模式
/**
* error 默认模式 如果文件已存在 则抛出异常
* append 如果文件已存在则追加
* overwrite 如果文件已存在则覆盖
* ignore 如果文件已存在则忽略
*/
df.write.format("json").mode("overwrite").save("output")
SparkSQL 链接MySQL
Maven配置
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
读取数据
val df: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/chu?useUnicode=true&characterEncoding=utf8")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "root")
.option("dbtable", "player")
.load()
df.show()
写入数据
val properties = new Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
properties.put("user", "root")
properties.put("password", "root")
val url = "jdbc:mysql://172.16.1.110:3306/demo1?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8"
dataFrame.write.mode(SaveMode.Append).jdbc(url, "demo1_1", properties)
读写Hive
def main(args: Array[String]): Unit = {
val sparkconf: SparkConf = new SparkConf()
.setMaster("spark://172.16.1.120:7077")
.setAppName("snakeSparkSQL")
.set("spark.sql.warehouse.dir","hdfs://172.16.1.120:10000/user/hive/warehouse")
.set("hive.metastore.uris","thrift://172.16.1.120:9083")
val spark: SparkSession = new SparkSession
.Builder()
.config(sparkconf)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val dataFrame: DataFrame = spark.read.table("default.stu2")
dataFrame.printSchema()
dataFrame.show()
dataFrame.write.format("Hive").mode(SaveMode.Append).saveAsTable("stu2")
spark.sql("drop table stu2")
spark.stop()
}
Spark MLlib
MLilb
MLilb 基本数据类型
VectorsAndLabeledPoint
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
object VectorsAndLabeledPoint {
def main(args: Array[String]): Unit = {
val vector: linalg.Vector = Vectors.dense(1, 2, 3)
val point: LabeledPoint = LabeledPoint(1, vector)
println(point.features)
println(point.label)
}
}
MLilb 基本运算
Statistics 求 均值、标准差、最大值
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object MeanAndVariance {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = new SparkSession.Builder().master("local").appName("MeanAndVariance").getOrCreate()
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.read.option("sep", " ").csv("src/main/resources/Kmeans.csv")
val rdd: RDD[linalg.Vector] = dataFrame.rdd.map((row: Row) => Vectors.dense(row.getAs[String]("_c0").toDouble))
val summary: MultivariateStatisticalSummary = Statistics.colStats(rdd)
println("均值:" + summary.mean)
println("标准差" + summary.variance)
println("最大值" + summary.max)
sparkSession.close()
}
}
求众数
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object ModeNumber {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = new SparkSession.Builder()
.master("local")
.appName("ApproxQuantile")
.getOrCreate()
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.read.csv("src/main/resources/ApproxQuantile.csv")
.map((row: Row) =>
row.getAs[String]("_c0").toDouble
).toDF("x")
val ModeNumber: Double = dataFrame.groupBy("x").count().orderBy(desc("count")).rdd.first().getAs[Double]("x")
println(ModeNumber)
sparkSession.close()
}
}
approxQuantile 求分位数 中位数
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object ApproxQuantile {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = new SparkSession.Builder()
.master("local")
.appName("ApproxQuantile")
.getOrCreate()
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.read.csv("src/main/resources/ApproxQuantile.csv")
.map((row: Row) =>
row.getAs[String]("_c0").toDouble
).toDF("x")
/**
* def approxQuantile(col: String, probabilities: Array[Double], relativeError: Double): Array[Double]
* col 列名
* 分位数概率列表 每个数字必须属于 [0, 1]。例如,0 是最小值,0.5 是中位数,1 是最大值。
* 要达到的相对目标精度(大于或等于 0)。如果设置为零,则计算确切的分位数,这可能非常昂贵。请注意,接受大于 1 的值,但给出与 1 相同的结果。
*/
val doubles: Array[Double] = dataFrame.stat.approxQuantile("x", Array(0.5), 0)
println(doubles.mkString("Array(", ", ", ")"))
sparkSession.close()
}
}
线性回归
LinearRegression
TODO 数据 =>
需求量,价格,输入
100,5,1000
75,7,600
80,6,1200
70,6,500
50,8,30
65,7,400
90,5,1300
100,4,1100
110,3,1300
60,9,300
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object LinearRegression {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = new SparkSession
.Builder()
.master("local[*]")
.appName("LinearRegression")
.getOrCreate()
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.read.csv("src/main/resources/LinearRegression.csv").toDF("y", "x1", "x2")
val ds: Dataset[LabeledPoint] = dataFrame.map((row: Row) => {
LabeledPoint(row.getAs[String]("y").toDouble,
Vectors.dense(
row.getAs[String]("x1").toDouble,
row.getAs[String]("x2").toDouble
)
)
}).cache()
val model: LinearRegressionModel = LinearRegressionWithSGD.train(ds.rdd, 100, 0.1)
val result: Double = model.predict(Vectors.dense(2.0, 2000.0))
println(result)
sparkSession.close()
}
}
聚类
Kmeans
1 2
1 1
1 3
2 2
3 4
4 3
2 2
4 4
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object MLlibKmeans {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = new SparkSession.Builder().master("local[1]").appName("Kmeans").getOrCreate()
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.read.option("delimiter", " ").csv("src/main/resources/Kmeans.csv").toDF("x", "y")
val rdd: RDD[linalg.Vector] = dataFrame.rdd.map((row: Row) => {
Vectors.dense(row.getAs[String]("x").toDouble, row.getAs[String]("y").toDouble)
})
val model: KMeansModel = KMeans.train(rdd, 4, 2000)
model.clusterCenters.foreach(println)
sparkSession.close()
}
}
ML
回归
LogisticRegression 逻辑回归
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
// 回归
object Pip_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().master("local").appName("Pip").getOrCreate()
import session.implicits._
val ds: Dataset[LabeledPoint] = session.read.csv("src/main/resources/LinearRegression.csv").map((row: Row) => LabeledPoint(row.getAs[String]("_c0").toDouble, Vectors.dense(row.getAs[String]("_c1").toDouble, row.getAs[String]("_c2").toDouble)))
ds.show()
println("=========================================================================")
val lr: LogisticRegression = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
println( lr.explainParams() )
println("=========================================================================")
val model: LogisticRegressionModel = lr.fit(ds)
println( model.parent.extractParamMap )
println("=========================================================================")
model.transform(ds.select("features")).show()
session.close()
}
}
LinearRegression 线性回归
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel, LinearRegressionTrainingSummary}
import org.apache.spark.sql.{DataFrame, SparkSession}
object LinearRegression_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
LabeledPoint(0, Vectors.dense(1.0, 0.1, -1.0)),
LabeledPoint(1, Vectors.dense(2.0, 1.1, 1.0)),
LabeledPoint(0, Vectors.dense(3.0, 10.1, 3.0))
))
// setMaxIter 迭代次数
//setRegParam 正则因子 0.0 - 1.0之间 0.0表示L1 1.0表示L2
val lr: LinearRegression = new LinearRegression().setMaxIter(500).setRegParam(0.3).setSolver("l-bfgs").setElasticNetParam(0.8)
val lrModel: LinearRegressionModel = lr.fit(dataFrame)
val res: DataFrame = lrModel.transform(dataFrame)
res.show()
// 模型摘要
val summary: LinearRegressionTrainingSummary = lrModel.summary
println(summary.totalIterations) // TODO 迭代次数
println("===========================================")
println(summary.objectiveHistory.mkString(",")) // TODO 每次迭代目标值
println("===========================================")
println(summary.rootMeanSquaredError)
println("===========================================")
println(summary.r2)
session.close()
}
}
特征提取
IDF 有监督
import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.sql.{DataFrame, SparkSession}
object Idf_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().master("local").appName("IDF").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(0.0, "Hi I heard About Spark"),
(0.0, "I Wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
dataFrame.show()
val tokenizer: Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val tokenizerDf: DataFrame = tokenizer.transform(dataFrame) // TODO 分词
tokenizerDf.show()
val hashingTf: HashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20) // 划分出20个向量
val hashingTfDf: DataFrame = hashingTf.transform(tokenizerDf) // TODO 获得词频向量
hashingTfDf.show()
val idf: IDF = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel: IDFModel = idf.fit(hashingTfDf) // TODO 将数据喂给IDF 得到IDF模型
val res: DataFrame = idfModel.transform(hashingTfDf.select("rawFeatures"))
res.show(false)
session.close()
}
}
Word2Vec 无监督
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.sql.{DataFrame, SparkSession}
object Word2Vec_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
"Hi I heard About Spark".split(" "),
"I Wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
dataFrame.show()
val vec: Word2Vec = new Word2Vec().setInputCol("text").setOutputCol("result").setVectorSize(3).setMinCount(0)
val model: Word2VecModel = vec.fit(dataFrame)
val res: DataFrame = model.transform(dataFrame)
res.show(false)
session.close()
}
}
CountVectorizerModel Any
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
import org.apache.spark.sql.{DataFrame, SparkSession}
object CV_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(0, Array("a", "b", "c", "a", "a", "a")),
(1, Array("d", "b", "b", "b", "c", "a"))
)).toDF("id", "words")
dataFrame.show()
// 有词典
val vectorizer: CountVectorizer = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(3).setMinDF(2)
val model1: CountVectorizerModel = vectorizer.fit(dataFrame)
model1.transform(dataFrame).show()
// 无词典
val model2: CountVectorizerModel = new CountVectorizerModel(Array("a", "b", "c")).setInputCol("words").setOutputCol("features")
model2.transform(dataFrame).show()
session.close()
}
}
特征转换
Tokenizer | regexTokenizer分词器
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.{DataFrame, SparkSession}
object Tokenizer_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(0, "Hi I heard About Spark"),
(1, "I Wish Java could use case classes"),
(2, "Logistic regression models are neat")
)).toDF("label", "sentence")
dataFrame.show()
// tokenizer分词器
val tokenizer: Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
// regexTokenizer 分词器 可灵活指定正则表达式
val regexTokenizer: RegexTokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern("\\w").setGaps(false)
// udf 计算长度
val countTokens: UserDefinedFunction = udf {(words: Seq[String]) => words.length}
// 特征变换
val tokenizerDf: DataFrame = tokenizer.transform(dataFrame)
tokenizerDf.show()
tokenizerDf.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show()
session.close()
}
}
StopWordsRemover 去停用词
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.{DataFrame, SparkSession}
object StopWords_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(0, Seq("Hi", "I", "heard", "About", "Spark")),
(1, Seq("I", "Wish", "Java", "could", "use", "case", "classes")),
(2, Seq("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "raw")
dataFrame.show()
val remover: StopWordsRemover = new StopWordsRemover().setInputCol("raw")
remover.transform(dataFrame).show(false)
session.close()
}
}
NGram 去停用词组
import org.apache.spark.ml.feature.{NGram, StopWordsRemover}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ngram_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(0, Seq("Hi", "I", "heard", "About", "Spark")),
(1, Seq("I", "Wish", "Java", "could", "use", "case", "classes"))
)).toDF("id", "words")
dataFrame.show()
val nGram: NGram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
val nGramDf: DataFrame = nGram.transform(dataFrame)
nGramDf.select("ngrams").show(false)
session.close()
}
}
Binarizer 二值化
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.sql.{DataFrame, SparkSession}
object Binarizer_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val data: Array[(Int, Double)] = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame: DataFrame = session.createDataFrame(data).toDF("id", "feature")
val binarizer: Binarizer = new Binarizer().setInputCol("feature").setOutputCol("binarized_feature").setThreshold(0.5)
val binarizedDf: DataFrame = binarizer.transform(dataFrame)
binarizedDf.show()
}
}
PCA 主元分析(高纬向量 转 低维向量)
import org.apache.spark.ml.feature.{PCA, PCAModel}
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
object PCA_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val data: Array[linalg.Vector] = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val dataFrame: DataFrame = session.createDataFrame(data.map(Tuple1.apply)).toDF("feature")
// TODO 五维降至三维
val pca: PCA = new PCA().setInputCol("feature").setOutputCol("pcaFeatures").setK(3)
val model: PCAModel = pca.fit(dataFrame)
val res: DataFrame = model.transform(dataFrame)
res.show(false)
session.close()
}
}
OneHotEncoder 独热编码
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, StringIndexerModel}
import org.apache.spark.sql.{DataFrame, SparkSession}
object OneHotEncoder_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("OneHotEncoder").master("local[*]").getOrCreate()
val df: DataFrame = session.createDataFrame(Seq(
(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a")
)).toDF("id", "category")
val indexerModel: StringIndexerModel = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex").fit(df)
val indexed: DataFrame = indexerModel.transform(df)
val encoder: OneHotEncoder = new OneHotEncoder().setInputCol("categoryIndex").setOutputCol("categoryVec")
val encoded: DataFrame = encoder.transform(indexed)
encoded.show()
session.close()
}
}
MinMaxScaler 最大最小值归一化
import org.apache.spark.ml.feature.{MinMaxScaler, MinMaxScalerModel}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
object MinMaxScaler_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(0, Vectors.dense(2.0, 1.1, 1.0)),
(0, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
val scaler: MinMaxScaler = new MinMaxScaler().setInputCol("features").setOutputCol("MinMaxScalerRes")
val model: MinMaxScalerModel = scaler.fit(dataFrame)
val res: DataFrame = model.transform(dataFrame)
res.show()
session.close()
}
}
VectorAssembler 合并特征向量
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
object VectorAsembler_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(0, 18, 1.0, Vectors.dense(1.0, 0.1, -1.0), 1.0)
)).toDF("id", "hour", "mobile", "userFeatures", "clicked")
dataFrame.show()
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("hour", "mobile", "userFeatures")).setOutputCol("features")
val output: DataFrame = assembler.transform(dataFrame)
output.show()
session.close()
}
}
QuantileDiscretizer 分位数
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.{DataFrame, SparkSession}
object QuantitleDiscretizer_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Array(
(0, 18.0),
(1, 19.0),
(2, 8.0),
(3, 5.0),
(4, 2.2)
)).toDF("id", "hour")
dataFrame.show()
val discretizer: QuantileDiscretizer = new QuantileDiscretizer().setInputCol("hour").setOutputCol("result").setNumBuckets(3)
val result: DataFrame = discretizer.fit(dataFrame).transform(dataFrame)
result.show()
session.close()
}
}
特征选择
RFormula R模型公式
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.sql.{DataFrame, SparkSession}
object RFormula_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula: RFormula = new RFormula().setFormula("clicked ~ country + hour").setFeaturesCol("features").setLabelCol("label")
val output: DataFrame = formula.fit(dataFrame).transform(dataFrame)
output.show()
session.close()
}
}
ChiSqSelector 卡方特征选择器
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
object ChiSqSelector_ML {
def main(args: Array[String]): Unit = {
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(data).toDF("id", "features", "clicked")
val selector: ChiSqSelector = new ChiSqSelector().setNumTopFeatures(1).setFeaturesCol("features").setLabelCol("clicked").setOutputCol("selectedFeatures")
val res: DataFrame = selector.fit(dataFrame).transform(dataFrame)
res.show()
session.close()
}
}
决策树
DecisionTreeClassifier 决策树模型
RandomForestClassifier 随机森林模型
GBTClassifier GBDT模型
Pipeline 管道流水线
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{DecisionTreeClassifier, GBTClassifier, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel, VectorIndexer, VectorIndexerModel}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
case object decision {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().master("local").appName("hell").getOrCreate()
// TODO 数据源 https://github.com/apache/spark/blob/master/data/mllib/sample_libsvm_data.txt
val dataFrame: DataFrame = session.read.format("libsvm").load("src/main/resources/libsvm.txt")
dataFrame.show()
// 对标签进行索引编号
val StringIndexerModel: StringIndexerModel = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(dataFrame)
// 对离散特征进行标记索引,以用来确定哪些特征是离散特征
// 如果一个特征的值超过4个以上,该特征视为连续特征,否则将会标记离散特征并进行索引编号
val VectorIndexerModel: VectorIndexerModel = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(dataFrame)
// 样本划分
val array: Array[Dataset[Row]] = dataFrame.randomSplit(Array(0.7, 0.3))
// 训练决策树模型
val dt: DecisionTreeClassifier = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
// 训练随机森林模型
val rf: RandomForestClassifier = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(10)
// 训练GBDT模型
val gbt: GBTClassifier = new GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10)
// 将索引的标签转会原标签
val labelConverter: IndexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(StringIndexerModel.labels)
// 构建Pipeline
val pipeline1: Pipeline = new Pipeline().setStages(Array(StringIndexerModel, VectorIndexerModel, dt, labelConverter))
val pipeline2: Pipeline = new Pipeline().setStages(Array(StringIndexerModel, VectorIndexerModel, rf, labelConverter))
val pipeline3: Pipeline = new Pipeline().setStages(Array(StringIndexerModel, VectorIndexerModel, gbt, labelConverter))
// 开始训练
val model1: PipelineModel = pipeline1.fit(dataFrame)
val model2: PipelineModel = pipeline2.fit(dataFrame)
val model3: PipelineModel = pipeline3.fit(dataFrame)
val predictions: DataFrame = model1.transform(array(1))
predictions.show()
/**
* TODO 其他写法相同
* 随机森林模型 和 GBDT模型 此处省略
*/
session.close()
}
}
聚类
KMeans 聚类
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
object KMeans_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("KMeans").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
LabeledPoint(1, Vectors.dense(1.0, 2.0)),
LabeledPoint(2, Vectors.dense(9.0, 22.0)),
LabeledPoint(3, Vectors.dense(19.0, 22.0))
))
val kMeans: KMeans = new KMeans().setK(2).setMaxIter(100).setSeed(1L)
val model: KMeansModel = kMeans.fit(dataFrame)
val res: DataFrame = model.transform(dataFrame)
res.show()
session.close()
}
}
IDA 主题聚类
import org.apache.spark.ml.clustering.{LDA, LDAModel}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}
object IDA_ML {
def main(args: Array[String]): Unit = {
val session: SparkSession = new SparkSession.Builder().appName("KMeans").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.createDataFrame(Seq(
LabeledPoint(1, Vectors.dense(1.0, 2.0)),
LabeledPoint(2, Vectors.dense(9.0, 22.0)),
LabeledPoint(3, Vectors.dense(19.0, 22.0))
))
val lda: LDA = new LDA().setK(10).setMaxIter(10)
val model: LDAModel = lda.fit(dataFrame)
val ll: Double = model.logLikelihood(dataFrame)
val lp: Double = model.logPerplexity(dataFrame)
println(ll)
println(lp)
session.close()
}
}
<br />
Spark Streaming
word count
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
// TODO 创建环境对象
// StreamingContext创建时,需要传递两个参数
// 第一个参数表示环境配置
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 第二个参数表示采集周期
val ssc = new StreamingContext(sparkConf, Seconds(3))
// TODO 逻辑处理
// 获取端口数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("127.0.0.1", 9999)
val words: DStream[String] = lines.flatMap((_: String).split(" "))
val wordToOne: DStream[(String, Int)] = words.map(((_: String), 1))
val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey((_: Int) + (_: Int))
wordToCount.print()
// TODO 关闭环境
// 由于sparkStreaming是长期采集器 所以是不能直接关闭
// 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
// ssc.stop()
// 启动采集器
ssc.start()
// 等待采集器的关闭
ssc.awaitTermination()
ssc.stop()
}
}
QueueStream
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object Queue {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Queue")
val streamingContext = new StreamingContext(sparkConf, Seconds(3))
val rddQueue = new mutable.Queue[RDD[Int]]()
val inputStream: InputDStream[Int] = streamingContext.queueStream(rddQueue, oneAtATime = false)
val mappedStream: DStream[(Int, Int)] = inputStream.map(((_: Int), 1))
val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey((_: Int) + (_: Int))
reducedStream.print()
streamingContext.start()
for (i <- 1 to 5){
rddQueue += streamingContext.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
streamingContext.awaitTermination()
}
}
MyReceiver 自定义采集器
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random
object Queue {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Queue")
val streamingContext = new StreamingContext(sparkConf, Seconds(3))
val messageDS: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver())
messageDS.print()
streamingContext.start()
streamingContext.awaitTermination()
}
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
private var flag = true
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while (flag){
val message: String = "采集的数据为:" + new Random().nextInt(10).toString
Thread.sleep(5000)
}
}
}).start()
}
override def onStop(): Unit = {
flag = false
}
}
}
网友评论