美文网首页
Spark笔记(两年前)

Spark笔记(两年前)

作者: 会爬虫的小蟒蛇 | 来源:发表于2022-09-29 15:06 被阅读0次

与Spark_Core建立链接

Spark_Core对Maven依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>

访问HDFS集群

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>your-hdfs-version</version>
        </dependency>

在程序中导包

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

初始化Spark_Core

创建链接

        // SparkConf Spark基础配置对象
        // setMaster Spark运行环境
        // setAppName 建立名称
        val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("wordCount")

        // SparkContext 与Spark框架建立链接
        val sc: SparkContext = new SparkContext(sparkConf);

关闭链接

        // 关闭链接
        sc.stop()

命令行中运行SparkJAR

bin/spark-submit 
# 主类
--class org.apache.spark.examples.SparkPi 
# 环境
--master local[2] 
# jar包位置
./examples/jars/sparl-examples_2.12-3.0.0.jar
# 参数 
10

弹性分布式数据集 (RDD)

并行集合

        val array: Array[Int] = Array(1, 2, 3, 4, 5)
        
        val rdd: RDD[Int] = sc.makeRDD(array) // 个人写法
        val rdd: RDD[Int] = sc.parallelize(array) // 文档标准写法
        
        rdd.collect().foreach(println)

外部数据集

        // 从 src/main/resources/full_gitee.csv 中读取; 支持通配符写法 如src/main/resources/*
        val lines: RDD[String] = sc.textFile("src/main/r9esources/full_gitee.csv")
        lines.collect().foreach(println)

RDD 操作

        // 读取data.txt
        val lines:RDD[String] = sc.textFile("src/main/resources/full_gitee.csv")
        // 获取 每一行的长度
        val lineLengths: RDD[Int] = lines.map(s => s.length)
        // 用reduce累计
        val totalLength: Int = lineLengths.reduce((a, b) => a + b)

        print(totalLength)

常用RDD算子

map 并传递执行所需的表达式

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

        val mapRDD: RDD[Int] = rdd.map((_: Int)*2)

        mapRDD.collect().foreach(println)

flatMap 扁平化处理

        val rdd: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)), 2)

        val flatMapRDD: RDD[Any] = rdd.flatMap {
            case list: List[_] => list
            case d => List(d)
        }
        flatMapRDD.collect().foreach(println)

mapPartitions 以分区为单位进行数据转换操作

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        // mapPartitions 可以以分区为单位进行数据转换操作
            //              但是会将整个分区的数据加载到内存进行引用
            //              如果处理完的数据是不会被释放掉,存在对象应勇
            //              在内存较小,数据量较大的场合下,容易出现内存溢出
        val mapRDD: RDD[Int] = rdd.mapPartitions(
            (iter: Iterator[Int]) => {
                println(">>>> ")
                iter.map(_ * 2)
            }
        )

        mapRDD.collect().foreach(println)

mapPartitionsWithIndex 以分区为单位进行过滤计算

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        val mapRDD: RDD[Int] = rdd.mapPartitionsWithIndex(
            // index 为分区索引
            (index: Int, iter: Iterator[Int]) => {
                if (index == 1){
                    iter
                }else{
                    Nil.iterator
                }
            }
        )

        mapRDD.collect().foreach(println)

glom 将同一个分区里的元素合并到一个array里

        val rdd: RDD[Any] = sc.makeRDD(List(1, 2, 3, 4), 2)

        val glomRDD: RDD[Array[Any]] = rdd.glom()

        glomRDD.collect().foreach(data => println(data.mkString(",")))

saveAsTextFile 保存到文件

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        val mapRDD: RDD[Int] = rdd.map(_ * 2)

        mapRDD.saveAsTextFile("src/main/resources/output")

groupBy 分组

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        val groupByRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)

        groupByRDD.collect().foreach(println)

filter 检索过滤

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)

        filterRDD.collect().foreach(println)

sample 取样

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)

        val str: String = rdd.sample(
            // 第一个参数表示 抽取数据后是否将数据放回 true(放回), false(丢弃)
            false,
            // 第二个参数表示 数据源中每条数据被抽取的概率
            0.4
            // 抽取数据 随机算法的种子
//            1
        ).collect().mkString(",")

        println(str)

distinct 去重

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4, 9, 10), 2)

        val rdd1: RDD[Int] = rdd.distinct()

        rdd1.collect().foreach(println)

coalesce 减少 分区

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4, 9, 10), 4)

        val newRDD: RDD[Int] = rdd.coalesce(2)

        newRDD.saveAsTextFile("src/main/resources/output")

repartition 增加 分区

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4, 9, 10), 2)

        val newRDD: RDD[Int] = rdd.repartition(3)

        newRDD.saveAsTextFile("src/main/resources/output")

sortBy 排序

        val rdd: RDD[Int] = sc.makeRDD(List(2, 4, 1, 3), 2)

        val newRDD: RDD[Int] = rdd.sortBy(num => num)

        newRDD.saveAsTextFile("src/main/resources/output")

count 计算数据集长度

val rdd: RDD[Any] = sc.makeRDD(List(1, 2, 3, 4), 2)

rdd.collect()

val len: Long = rdd.count()

print(len)

sortByKey 按键排序

val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("C", 1), ("B", 1), ("A", 1)), 2)

val sortByKeyrdd: RDD[(String, Int)] = rdd.sortByKey()

sortByKeyrdd.collect().foreach(println)

groupByKey 按键分组

val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("C", 1), ("B", 1), ("A", 1)), 2)

val groupByKeyRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()

groupByKeyRDD.collect().foreach(println)

reduceByKey 按键分组 并执行 reduce 操作

val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("C", 1), ("B", 1), ("A", 1)), 2)

val reduceByKeyRDD: RDD[(String, Int)] = rdd.reduceByKey((a, b) => (a + b))

reduceByKeyRDD.collect().foreach(println)

与Spark_SQL建立链接

Spark_Core对Maven依赖

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>

在程序中导包

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

初始化Spark_SQL

创建链接

        // 创建SparkSQL运行环境
        val sparkconf: SparkConf = new SparkConf().setMaster("local[*]")
                .setAppName("snakeSparkSQL")
                .set("spark.sql.warehouse.dir","file:///")
        val spark: SparkSession = new SparkSession.Builder().config(sparkconf).getOrCreate()

关闭链接

        // 关闭链接
        spark.stop()

SQL

数据源

{"name": "yr", "age": 18}
{"name": "yr1", "age": 14}
{"name": "yr2", "age": 13}
{"name": "yr3", "age": 2}
{"name": "yr4", "age": 2}
{"name": "yr5", "age": 5}

从数据源读取数据

// 将数据读取为 DataFrame 数据源可以是 CSV format jdbc json load option options orc schema table text textFile
        val df: DataFrame = spark.read.json("src/main/resources/data.json")
//        df.show()

将DataFrame转为临时表

        df.createOrReplaceTempView("dataTable")

对临时表使用SQL语句

        spark.sql("select * from dataTable").show()
        spark.sql("select name,age from dataTable").show()
        spark.sql("select avg(age) from dataTable").show()

直接对DataFrame使用DSL

        df.select("name", "age").show()

        // 在使用 DataFrame 时, 如果涉及到转换操作, 需要引入转换规则
        // 这里引入的是 val spark: SparkSession 中的隐式转换规则
        import spark.implicits._
        df.select($"age" + 1).show()

DataSet

        // DataFrame 其实是特定泛型的DataSet 所以DataFrame所有方法 DataSet 均可以使用
        val seq: Seq[Int] = Seq(1, 2, 3, 4)
        val ds: Dataset[Int] = seq.toDS()
        ds.show()

RDD <=> DataFrame

        // 创建RDD
        val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "yr1", 11), (2, "yr2", 12), (3, "yr3", 13), (4, "yr4", 14)))
        // RDD 转换为 DataFrame
        val df: DataFrame = rdd.toDF("id", "name", "age")
        // DataFrame 转换为 RDD
        val rdd1: RDD[Row] = df.rdd

DataSet <=> DataFrame

        // DF 转换为 DS
        val ds: Dataset[User] = df.as[User]
        // DS 转换为 DF
        val newdf: DataFrame = ds.toDF()

RDD <=> DataSet

        // RDD => DS
        val newds: Dataset[User] = rdd.map {
            case (id, name, age) =>
                User(id, name, age)
        }.toDS()
        // DS => RDD
        val rdd2: RDD[User] = newds.rdd

自定义UDF函数的使用

        spark.udf.register("prefixName", (name: String) =>{
            "name" + name
        })

        spark.sql("select prefixName(name), age from user").show()
完整示例 自定义聚合函数类: 计算年龄平均值

弱类型转换方法

object mySPKsql {
    def main(args: Array[String]): Unit = {
        val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("snakeSparkSQL").set("spark.sql.warehouse.dir","file:///")
        val spark: SparkSession = new SparkSession.Builder().config(sparkconf).getOrCreate()

        import spark.implicits._

        val df: DataFrame = spark.read.json("src/main/resources/data.json")

        df.createOrReplaceTempView("user")

        spark.udf.register("ageAVG", new MyAvgUDAF)

        spark.sql("select ageAVG(age) from user").show()

        spark.stop()
    }


    /**
     * 自定义聚合函数类: 计算年龄平均值
     */
    class MyAvgUDAF extends UserDefinedAggregateFunction{
        // 输入数据的结构
        override def inputSchema: StructType = {
            StructType(
                Array(
                    StructField("age", LongType)
                )
            )
        }

        // 缓冲区数据结构
        override def bufferSchema: StructType = {
            StructType(
                Array(
                    StructField("total", LongType),
                    StructField("count", LongType)
                )
            )
        }

        // 函数就算结果的数据类型
        override def dataType: DataType = LongType

        // 函数的稳定性
        override def deterministic: Boolean = true

        // 缓存区初始化
        override def initialize(buffer: MutableAggregationBuffer): Unit = {
//            buffer(0) = 0L
//            buffer(1) = 0L

            buffer.update(0, 0L)
            buffer.update(1, 0L)
        }

        // 根据输入的值来更新缓存区
        override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
            buffer.update(0, buffer.getLong(0) + input.getLong(0))
            buffer.update(1, buffer.getLong(1) + 1)
        }

        // 缓冲区数据合并
        override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
            buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
            buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
        }

        // 就算平均值
        override def evaluate(buffer: Row): Any = {
            buffer.getLong(0)/buffer.getLong(1)
        }
    }
}

强类型转换方法

>=Spark 3.0

通用读取数据

spark.read.format("json").load("data.json")

不创建临时表 执行SQL

spark.sql("select * from json.`data.json`").show

通用保存数据

df.write.format("json").save("output")

保存模式

/**
* error 默认模式 如果文件已存在 则抛出异常
* append 如果文件已存在则追加
* overwrite 如果文件已存在则覆盖
* ignore 如果文件已存在则忽略
*/ 
df.write.format("json").mode("overwrite").save("output")

SparkSQL 链接MySQL

Maven配置

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.26</version>
        </dependency>

读取数据

        val df: DataFrame = spark.read.format("jdbc")
            .option("url", "jdbc:mysql://127.0.0.1:3306/chu?useUnicode=true&characterEncoding=utf8")
            .option("driver", "com.mysql.jdbc.Driver")
            .option("user", "root")
            .option("password", "root")
            .option("dbtable", "player")
            .load()
        df.show()

写入数据

val properties = new Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
properties.put("user", "root")
properties.put("password", "root")
val url = "jdbc:mysql://172.16.1.110:3306/demo1?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8"
dataFrame.write.mode(SaveMode.Append).jdbc(url, "demo1_1", properties)

读写Hive

  def main(args: Array[String]): Unit = {
    val sparkconf: SparkConf = new SparkConf()
      .setMaster("spark://172.16.1.120:7077")
      .setAppName("snakeSparkSQL")
      .set("spark.sql.warehouse.dir","hdfs://172.16.1.120:10000/user/hive/warehouse")
      .set("hive.metastore.uris","thrift://172.16.1.120:9083")

    val spark: SparkSession = new SparkSession
      .Builder()
      .config(sparkconf)
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val dataFrame: DataFrame = spark.read.table("default.stu2")

    dataFrame.printSchema()

    dataFrame.show()
    dataFrame.write.format("Hive").mode(SaveMode.Append).saveAsTable("stu2")

    spark.sql("drop table stu2")
    
    spark.stop()
  }

Spark MLlib

MLilb

MLilb 基本数据类型

VectorsAndLabeledPoint

import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

object VectorsAndLabeledPoint {
  def main(args: Array[String]): Unit = {
    val vector: linalg.Vector = Vectors.dense(1, 2, 3)

    val point: LabeledPoint = LabeledPoint(1, vector)

    println(point.features)
    println(point.label)
  }
}

MLilb 基本运算

Statistics 求 均值、标准差、最大值

import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object MeanAndVariance {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = new SparkSession.Builder().master("local").appName("MeanAndVariance").getOrCreate()
    import sparkSession.implicits._
    val dataFrame: DataFrame = sparkSession.read.option("sep", " ").csv("src/main/resources/Kmeans.csv")

    val rdd: RDD[linalg.Vector] = dataFrame.rdd.map((row: Row) => Vectors.dense(row.getAs[String]("_c0").toDouble))

    val summary: MultivariateStatisticalSummary = Statistics.colStats(rdd)

    println("均值:" + summary.mean)
    println("标准差" + summary.variance)
    println("最大值" + summary.max)

    sparkSession.close()
  }
}

求众数

import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object ModeNumber {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = new SparkSession.Builder()
      .master("local")
      .appName("ApproxQuantile")
      .getOrCreate()
    import sparkSession.implicits._

    val dataFrame: DataFrame = sparkSession.read.csv("src/main/resources/ApproxQuantile.csv")
      .map((row: Row) =>
        row.getAs[String]("_c0").toDouble
      ).toDF("x")

    val ModeNumber: Double = dataFrame.groupBy("x").count().orderBy(desc("count")).rdd.first().getAs[Double]("x")

    println(ModeNumber)

    sparkSession.close()
  }
}

approxQuantile 求分位数 中位数

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object ApproxQuantile {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = new SparkSession.Builder()
      .master("local")
      .appName("ApproxQuantile")
      .getOrCreate()
    import sparkSession.implicits._

    val dataFrame: DataFrame = sparkSession.read.csv("src/main/resources/ApproxQuantile.csv")
      .map((row: Row) =>
        row.getAs[String]("_c0").toDouble
      ).toDF("x")

    /**
     * def approxQuantile(col: String, probabilities: Array[Double], relativeError: Double): Array[Double]
     * col 列名
     * 分位数概率列表 每个数字必须属于 [0, 1]。例如,0 是最小值,0.5 是中位数,1 是最大值。
     * 要达到的相对目标精度(大于或等于 0)。如果设置为零,则计算确切的分位数,这可能非常昂贵。请注意,接受大于 1 的值,但给出与 1 相同的结果。
     */
    val doubles: Array[Double] = dataFrame.stat.approxQuantile("x", Array(0.5), 0)

    println(doubles.mkString("Array(", ", ", ")"))
    sparkSession.close()
  }
}

线性回归

LinearRegression

TODO 数据 => 
需求量,价格,输入
100,5,1000
75,7,600
80,6,1200
70,6,500
50,8,30
65,7,400
90,5,1300
100,4,1100
110,3,1300
60,9,300

import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object LinearRegression {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = new SparkSession
      .Builder()
      .master("local[*]")
      .appName("LinearRegression")
      .getOrCreate()
    import sparkSession.implicits._

   val dataFrame: DataFrame = sparkSession.read.csv("src/main/resources/LinearRegression.csv").toDF("y", "x1", "x2")

    val ds: Dataset[LabeledPoint] = dataFrame.map((row: Row) => {
      LabeledPoint(row.getAs[String]("y").toDouble,
        Vectors.dense(
          row.getAs[String]("x1").toDouble,
          row.getAs[String]("x2").toDouble
        )
      )
    }).cache()

    val model: LinearRegressionModel = LinearRegressionWithSGD.train(ds.rdd, 100, 0.1)

    val result: Double = model.predict(Vectors.dense(2.0, 2000.0))

    println(result)

   sparkSession.close()
  }
}

聚类

Kmeans

1 2
1 1
1 3
2 2
3 4
4 3
2 2
4 4
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object MLlibKmeans {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = new SparkSession.Builder().master("local[1]").appName("Kmeans").getOrCreate()
    import sparkSession.implicits._
    val dataFrame: DataFrame = sparkSession.read.option("delimiter", " ").csv("src/main/resources/Kmeans.csv").toDF("x", "y")

    val rdd: RDD[linalg.Vector] = dataFrame.rdd.map((row: Row) => {
      Vectors.dense(row.getAs[String]("x").toDouble, row.getAs[String]("y").toDouble)
    })

    val model: KMeansModel = KMeans.train(rdd, 4, 2000)

    model.clusterCenters.foreach(println)

    sparkSession.close()
  }
}

ML

回归

LogisticRegression 逻辑回归

import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

// 回归
object Pip_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().master("local").appName("Pip").getOrCreate()
    import session.implicits._
    val ds: Dataset[LabeledPoint] = session.read.csv("src/main/resources/LinearRegression.csv").map((row: Row) => LabeledPoint(row.getAs[String]("_c0").toDouble, Vectors.dense(row.getAs[String]("_c1").toDouble, row.getAs[String]("_c2").toDouble)))
    ds.show()
    println("=========================================================================")

    val lr: LogisticRegression = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
    println( lr.explainParams() )
    println("=========================================================================")

    val model: LogisticRegressionModel = lr.fit(ds)
    println( model.parent.extractParamMap )
    println("=========================================================================")

    model.transform(ds.select("features")).show()
    session.close()
  }
}

LinearRegression 线性回归

import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel, LinearRegressionTrainingSummary}
import org.apache.spark.sql.{DataFrame, SparkSession}

object LinearRegression_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val dataFrame: DataFrame = session.createDataFrame(Seq(
      LabeledPoint(0, Vectors.dense(1.0, 0.1, -1.0)),
      LabeledPoint(1, Vectors.dense(2.0, 1.1, 1.0)),
      LabeledPoint(0, Vectors.dense(3.0, 10.1, 3.0))
    ))

    // setMaxIter 迭代次数
    //setRegParam 正则因子 0.0 - 1.0之间 0.0表示L1 1.0表示L2
    val lr: LinearRegression = new LinearRegression().setMaxIter(500).setRegParam(0.3).setSolver("l-bfgs").setElasticNetParam(0.8)
    val lrModel: LinearRegressionModel = lr.fit(dataFrame)

    val res: DataFrame = lrModel.transform(dataFrame)
    res.show()

    // 模型摘要
    val summary: LinearRegressionTrainingSummary = lrModel.summary
    
    println(summary.totalIterations) // TODO 迭代次数
    println("===========================================")
    println(summary.objectiveHistory.mkString(",")) // TODO 每次迭代目标值
    println("===========================================")
    println(summary.rootMeanSquaredError)
    println("===========================================")
    println(summary.r2)
    session.close()
  }
}

特征提取

IDF 有监督

import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.sql.{DataFrame, SparkSession}

object Idf_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().master("local").appName("IDF").getOrCreate()
    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (0.0, "Hi I heard About Spark"),
      (0.0, "I Wish Java could use case classes"),
      (1.0, "Logistic regression models are neat")
    )).toDF("label", "sentence")

    dataFrame.show()

    val tokenizer: Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
    val tokenizerDf: DataFrame = tokenizer.transform(dataFrame) // TODO 分词
    tokenizerDf.show()

    val hashingTf: HashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20) // 划分出20个向量
    val hashingTfDf: DataFrame = hashingTf.transform(tokenizerDf) // TODO 获得词频向量
    hashingTfDf.show()

    val idf: IDF = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    val idfModel: IDFModel = idf.fit(hashingTfDf) // TODO 将数据喂给IDF 得到IDF模型

    val res: DataFrame = idfModel.transform(hashingTfDf.select("rawFeatures"))
    res.show(false)
    session.close()
  }
}

Word2Vec 无监督

import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.sql.{DataFrame, SparkSession}

object Word2Vec_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()

    val dataFrame: DataFrame = session.createDataFrame(Seq(
      "Hi I heard About Spark".split(" "),
      "I Wish Java could use case classes".split(" "),
      "Logistic regression models are neat".split(" ")
    ).map(Tuple1.apply)).toDF("text")
    dataFrame.show()

    val vec: Word2Vec = new Word2Vec().setInputCol("text").setOutputCol("result").setVectorSize(3).setMinCount(0)
    val model: Word2VecModel = vec.fit(dataFrame)

    val res: DataFrame = model.transform(dataFrame)
    res.show(false)

    session.close()
  }
}

CountVectorizerModel Any

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
import org.apache.spark.sql.{DataFrame, SparkSession}

object CV_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()

    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (0, Array("a", "b", "c", "a", "a", "a")),
      (1, Array("d", "b", "b", "b", "c", "a"))
    )).toDF("id", "words")
    dataFrame.show()

    // 有词典
    val vectorizer: CountVectorizer = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(3).setMinDF(2)
    val model1: CountVectorizerModel = vectorizer.fit(dataFrame)
    model1.transform(dataFrame).show()

    // 无词典
    val model2: CountVectorizerModel = new CountVectorizerModel(Array("a", "b", "c")).setInputCol("words").setOutputCol("features")
    model2.transform(dataFrame).show()

    session.close()
  }
}

特征转换

Tokenizer | regexTokenizer分词器

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.{DataFrame, SparkSession}


object Tokenizer_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()

    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (0, "Hi I heard About Spark"),
      (1, "I Wish Java could use case classes"),
      (2, "Logistic regression models are neat")
    )).toDF("label", "sentence")
    dataFrame.show()

    // tokenizer分词器
    val tokenizer: Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
    // regexTokenizer 分词器 可灵活指定正则表达式
    val regexTokenizer: RegexTokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern("\\w").setGaps(false)

    // udf 计算长度
    val countTokens: UserDefinedFunction =  udf {(words: Seq[String]) => words.length}

    // 特征变换
    val tokenizerDf: DataFrame = tokenizer.transform(dataFrame)
    tokenizerDf.show()
    tokenizerDf.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show()

    session.close()
  }
}

StopWordsRemover 去停用词

import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.{DataFrame, SparkSession}

object StopWords_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()

    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (0, Seq("Hi", "I", "heard", "About", "Spark")),
      (1, Seq("I", "Wish", "Java", "could", "use", "case", "classes")),
      (2, Seq("Logistic", "regression", "models", "are", "neat"))
    )).toDF("id", "raw")
    dataFrame.show()

    val remover: StopWordsRemover = new StopWordsRemover().setInputCol("raw")

    remover.transform(dataFrame).show(false)
    
    session.close()
  }
}

NGram 去停用词组

import org.apache.spark.ml.feature.{NGram, StopWordsRemover}
import org.apache.spark.sql.{DataFrame, SparkSession}

object ngram_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()

    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (0, Seq("Hi", "I", "heard", "About", "Spark")),
      (1, Seq("I", "Wish", "Java", "could", "use", "case", "classes"))
    )).toDF("id", "words")
    dataFrame.show()

    val nGram: NGram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
    val nGramDf: DataFrame = nGram.transform(dataFrame)

    nGramDf.select("ngrams").show(false)
    session.close()
  }
}

Binarizer 二值化

import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.sql.{DataFrame, SparkSession}

object Binarizer_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val data: Array[(Int, Double)] = Array((0, 0.1), (1, 0.8), (2, 0.2))
    val dataFrame: DataFrame = session.createDataFrame(data).toDF("id", "feature")

    val binarizer: Binarizer = new Binarizer().setInputCol("feature").setOutputCol("binarized_feature").setThreshold(0.5)

    val binarizedDf: DataFrame = binarizer.transform(dataFrame)

    binarizedDf.show()
  }
}

PCA 主元分析(高纬向量 转 低维向量)

import org.apache.spark.ml.feature.{PCA, PCAModel}
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}

object PCA_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val data: Array[linalg.Vector] = Array(
      Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
      Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
      Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
    )
    val dataFrame: DataFrame = session.createDataFrame(data.map(Tuple1.apply)).toDF("feature")

    // TODO 五维降至三维
    val pca: PCA = new PCA().setInputCol("feature").setOutputCol("pcaFeatures").setK(3)
    val model: PCAModel = pca.fit(dataFrame)
    val res: DataFrame = model.transform(dataFrame)
    res.show(false)

    session.close()
  }
}

OneHotEncoder 独热编码

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, StringIndexerModel}
import org.apache.spark.sql.{DataFrame, SparkSession}

object OneHotEncoder_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("OneHotEncoder").master("local[*]").getOrCreate()

    val df: DataFrame = session.createDataFrame(Seq(
      (0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a")
    )).toDF("id", "category")

    val indexerModel: StringIndexerModel = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex").fit(df)

    val indexed: DataFrame = indexerModel.transform(df)

    val encoder: OneHotEncoder = new OneHotEncoder().setInputCol("categoryIndex").setOutputCol("categoryVec")
    val encoded: DataFrame = encoder.transform(indexed)
    encoded.show()

    session.close()
  }
}

MinMaxScaler 最大最小值归一化

import org.apache.spark.ml.feature.{MinMaxScaler, MinMaxScalerModel}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}

object MinMaxScaler_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (0, Vectors.dense(1.0, 0.1, -1.0)),
      (0, Vectors.dense(2.0, 1.1, 1.0)),
      (0, Vectors.dense(3.0, 10.1, 3.0))
    )).toDF("id", "features")

    val scaler: MinMaxScaler = new MinMaxScaler().setInputCol("features").setOutputCol("MinMaxScalerRes")
    val model: MinMaxScalerModel = scaler.fit(dataFrame)

    val res: DataFrame = model.transform(dataFrame)
    res.show()

    session.close()
  }
}

VectorAssembler 合并特征向量

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}

object VectorAsembler_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (0, 18, 1.0, Vectors.dense(1.0, 0.1, -1.0), 1.0)
    )).toDF("id", "hour", "mobile", "userFeatures", "clicked")
    dataFrame.show()

    val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("hour", "mobile", "userFeatures")).setOutputCol("features")
    val output: DataFrame = assembler.transform(dataFrame)
    output.show()

    session.close()
  }
}

QuantileDiscretizer 分位数

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.{DataFrame, SparkSession}

object QuantitleDiscretizer_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val dataFrame: DataFrame = session.createDataFrame(Array(
      (0, 18.0),
      (1, 19.0),
      (2, 8.0),
      (3, 5.0),
      (4, 2.2)
    )).toDF("id", "hour")
    dataFrame.show()

    val discretizer: QuantileDiscretizer = new QuantileDiscretizer().setInputCol("hour").setOutputCol("result").setNumBuckets(3)

    val result: DataFrame = discretizer.fit(dataFrame).transform(dataFrame)
    result.show()

    session.close()
  }
}

特征选择

RFormula R模型公式

import org.apache.spark.ml.feature.RFormula
import org.apache.spark.sql.{DataFrame, SparkSession}

object RFormula_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val dataFrame: DataFrame = session.createDataFrame(Seq(
      (7, "US", 18, 1.0),
      (8, "CA", 12, 0.0),
      (9, "NZ", 15, 0.0)
    )).toDF("id", "country", "hour", "clicked")

    val formula: RFormula = new RFormula().setFormula("clicked ~ country + hour").setFeaturesCol("features").setLabelCol("label")

    val output: DataFrame = formula.fit(dataFrame).transform(dataFrame)
    output.show()
    
    session.close()
  }
}

ChiSqSelector 卡方特征选择器

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}

object ChiSqSelector_ML {
  def main(args: Array[String]): Unit = {
    val data = Seq(
      (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
      (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
      (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
    )
    val session: SparkSession = new SparkSession.Builder().appName("").master("local[*]").getOrCreate()
    val dataFrame: DataFrame = session.createDataFrame(data).toDF("id", "features", "clicked")

    val selector: ChiSqSelector = new ChiSqSelector().setNumTopFeatures(1).setFeaturesCol("features").setLabelCol("clicked").setOutputCol("selectedFeatures")
    val res: DataFrame = selector.fit(dataFrame).transform(dataFrame)
    res.show()

    session.close()
  }
}

决策树

DecisionTreeClassifier 决策树模型

RandomForestClassifier 随机森林模型

GBTClassifier GBDT模型

Pipeline 管道流水线

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{DecisionTreeClassifier, GBTClassifier, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel, VectorIndexer, VectorIndexerModel}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

case object decision {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().master("local").appName("hell").getOrCreate()

    // TODO 数据源 https://github.com/apache/spark/blob/master/data/mllib/sample_libsvm_data.txt
    val dataFrame: DataFrame = session.read.format("libsvm").load("src/main/resources/libsvm.txt")
    dataFrame.show()

    // 对标签进行索引编号
    val StringIndexerModel: StringIndexerModel = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(dataFrame)
    // 对离散特征进行标记索引,以用来确定哪些特征是离散特征
    // 如果一个特征的值超过4个以上,该特征视为连续特征,否则将会标记离散特征并进行索引编号
    val VectorIndexerModel: VectorIndexerModel = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(dataFrame)

    // 样本划分
    val array: Array[Dataset[Row]] = dataFrame.randomSplit(Array(0.7, 0.3))

    // 训练决策树模型
    val dt: DecisionTreeClassifier = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
    // 训练随机森林模型
    val rf: RandomForestClassifier = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setNumTrees(10)
    // 训练GBDT模型
    val gbt: GBTClassifier = new GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10)

    // 将索引的标签转会原标签
    val labelConverter: IndexToString = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(StringIndexerModel.labels)

    // 构建Pipeline
    val pipeline1: Pipeline = new Pipeline().setStages(Array(StringIndexerModel, VectorIndexerModel, dt, labelConverter))
    val pipeline2: Pipeline = new Pipeline().setStages(Array(StringIndexerModel, VectorIndexerModel, rf, labelConverter))
    val pipeline3: Pipeline = new Pipeline().setStages(Array(StringIndexerModel, VectorIndexerModel, gbt, labelConverter))

    // 开始训练
    val model1: PipelineModel = pipeline1.fit(dataFrame)
    val model2: PipelineModel = pipeline2.fit(dataFrame)
    val model3: PipelineModel = pipeline3.fit(dataFrame)

    val predictions: DataFrame = model1.transform(array(1))
    predictions.show()

    /**
     * TODO 其他写法相同
     * 随机森林模型 和 GBDT模型 此处省略
     */

    session.close()
  }
}

聚类

KMeans 聚类

import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}

object KMeans_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("KMeans").master("local[*]").getOrCreate()

    val dataFrame: DataFrame = session.createDataFrame(Seq(
      LabeledPoint(1, Vectors.dense(1.0, 2.0)),
      LabeledPoint(2, Vectors.dense(9.0, 22.0)),
      LabeledPoint(3, Vectors.dense(19.0, 22.0))
    ))

    val kMeans: KMeans = new KMeans().setK(2).setMaxIter(100).setSeed(1L)
    val model: KMeansModel = kMeans.fit(dataFrame)

    val res: DataFrame = model.transform(dataFrame)
    res.show()

    session.close()
  }
}

IDA 主题聚类

import org.apache.spark.ml.clustering.{LDA, LDAModel}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}

object IDA_ML {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = new SparkSession.Builder().appName("KMeans").master("local[*]").getOrCreate()

    val dataFrame: DataFrame = session.createDataFrame(Seq(
      LabeledPoint(1, Vectors.dense(1.0, 2.0)),
      LabeledPoint(2, Vectors.dense(9.0, 22.0)),
      LabeledPoint(3, Vectors.dense(19.0, 22.0))
    ))

    val lda: LDA = new LDA().setK(10).setMaxIter(10)
    val model: LDAModel = lda.fit(dataFrame)

    val ll: Double = model.logLikelihood(dataFrame)
    val lp: Double = model.logPerplexity(dataFrame)
    
    println(ll)
    println(lp)

    session.close()
  }
}

<br />

Spark Streaming

word count

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
  def main(args: Array[String]): Unit = {

    // TODO 创建环境对象
    // StreamingContext创建时,需要传递两个参数
    // 第一个参数表示环境配置
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    // 第二个参数表示采集周期
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // TODO 逻辑处理
    // 获取端口数据
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("127.0.0.1", 9999)

    val words: DStream[String] = lines.flatMap((_: String).split(" "))

    val wordToOne: DStream[(String, Int)] = words.map(((_: String), 1))

    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey((_: Int) + (_: Int))

    wordToCount.print()
    // TODO 关闭环境
    // 由于sparkStreaming是长期采集器 所以是不能直接关闭
    // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
    // ssc.stop()
    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()

    ssc.stop()
  }
}

QueueStream

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object Queue {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Queue")
    val streamingContext = new StreamingContext(sparkConf, Seconds(3))

    val rddQueue = new mutable.Queue[RDD[Int]]()

    val inputStream: InputDStream[Int] = streamingContext.queueStream(rddQueue, oneAtATime = false)

    val mappedStream: DStream[(Int, Int)] = inputStream.map(((_: Int), 1))
    val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey((_: Int) + (_: Int))
    reducedStream.print()

    streamingContext.start()

    for (i <- 1 to 5){
      rddQueue += streamingContext.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }

    streamingContext.awaitTermination()
  }
}

MyReceiver 自定义采集器

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Random

object Queue {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Queue")
    val streamingContext = new StreamingContext(sparkConf, Seconds(3))

    val messageDS: ReceiverInputDStream[String] = streamingContext.receiverStream(new MyReceiver())
    messageDS.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
    private var flag = true

    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          while (flag){
            val message: String = "采集的数据为:" + new Random().nextInt(10).toString
            Thread.sleep(5000)
          }
        }
      }).start()
    }

    override def onStop(): Unit = {
      flag = false
    }
  }
}

相关文章

网友评论

      本文标题:Spark笔记(两年前)

      本文链接:https://www.haomeiwen.com/subject/lualartx.html