美文网首页
Spark应用学习笔记

Spark应用学习笔记

作者: 卡卡xx | 来源:发表于2018-08-25 22:57 被阅读0次

Sqoop

sqoop 是 apache 旗下一款“Hadoop 和关系数据库服务器之间传送数据”的工具。
核心的功能有两个:

  • 导入、迁入
  • 导出、迁出

导入数据:MySQL,Oracle 导入数据到 Hadoop 的 HDFS、HIVE、HBASE 等数据存储系统
导出数据:从 Hadoop 的文件系统中导出数据到关系数据库 mysql 等 Sqoop 的本质还是一个命令行工具,和 HDFS,Hive 相比,并没有什么高深的理论。
本质就是迁移数据, 迁移的方式:就是把sqoop的迁移命令转换成MR程序

安装Sqoop

使用CDH安装组件直接添加服务即可,相关配置文件会自动修改。sqoop就是一个工具, 只需要在一个节点上进行安装即可,安装版本为Sqoop2。



安装完成并重新部署集群后,去sqoop目录下检查安装是否成功和相应的端口是否开启 检测是否可以连接数据库 如果报错就把连接数据库的jar放到它的lib下面。它的作用就是把关系型数据库导入hive、hdfs之类的hadoop系统里,在下面会用到。

Spark sql

前面我们学习了Hive,它是运行在Hadoop上的SQL-on-Hadoop工具,但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:Shark(spark sql的前身),Hive和它的运行架构如下图:


SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便
使用Spark SQL库可以对存储在RDD、批处理文件、JSON数据集或Hive表中的数据执行SQL查询。而且性能提高了10-100倍
sparkSQL层级
当我们想用sparkSQL来解决我们的需求时,其实说简单也简单,就经历了三步:读入数据 -> 对数据进行处理 -> 写入最后结果,那么这三个步骤用的主要类其实就三个:读入数据和写入最后结果用到两个类HiveContext和SQLContext,对数据进行处理用到的是DataFrame类,此类是你把数据从外部读入到内存后,数据在内存中进行存储的基本数据结构,在对数据进行处理时还会用到一些中间类,用到时在进行讲解。如下图所示:
HiveContext和SQLContext

HiveContext继承自SQLContext,因为HQL和SQL有一定的差别,所以有两个引擎。使用不同的读数据的类,底层会进行标记,自动识别是使用哪个类进行数据操作,然后采用不同的执行计划执行操作。
当从hive库中读数据的时候,必须使用HiveContext来进行读取数据,不然在进行查询的时候会出一些奇怪的错。其他的数据源两者都可以选择,但是最好使用SQLContext来完成。因为其支持的sql语法更多。

读数据

  • 这里的数据来源是包含很多.sql的文件夹,所以第一步是导入.sql到mysql数据库
    • 进入到employees_db所文件夹中
    • 执行:mysql -u root -p <employees.sql (注意不要添加;号)
    • 查看导入情况


  • 将mysql中数据导出到Hive中
    这里就正好要使用Sqoop的功能了。从mysql导出到Hive的之前,首先要在Hive中创建一个数据库和结构一样的表:
    • 创建数据库
      create database employees_db;
    • 创建和mysql中一样的表
      $ sqoop create-hive-table --connect jdbc:mysql://master/metastore --username hive -- password hive --table employees
    • 拷贝数据

      但是在用sqoop2的时候总是出字符问题(估计是sqoop2自身问题或者JDK不匹配问题),所以暂时该用Sogou数据
  • 从Hive中读数据(使用的spark shell)
    为了让Spark能够连接到Hive的原有数据仓库,我们需要将Hive中的hive-site.xml文件拷贝到Spark的conf目录下,这样就可以通过这个配置文件找到Hive的元数据以及数据存放。
    在这里由于我的Spark是自动安装和部署的,因此需要知道CDH将hive-site.xml放在哪里。经过摸索。该文件默认所在的路径是:/etc/hive/conf 下。
    同理,spark的conf也是在/etc/spark/conf。
    import org.apache.spark.sql.hive.HiveContext
    val sqlContext = new HiveContext(sc)
    sqlContext.sql("use mytest")
    
    • 查看所有列
      sqlContext.sql("select * from sogou limit 5").show()
    • 列最大
      sqlContext.sql("select max(rank) from sogou").show()
    • 列最小
      sqlContext.sql("select min(rank) from sogou").show()
    • 列平均
      sqlContext.sql("select avg(rank) from sogou").show()
    • 列方差
      sqlContext.sql("select variance(rank) from sogou").show()
    • 记录总数
      sqlContext.sql("select count(*) from sogou").show()

Mlib

MLlib是Spark里的机器学习库。它的目标是使实用的机器学习算法可扩展并容易使用。它提供如下工具:
1.机器学习算法:常规机器学习算法包括分类、回归、聚类和协同过滤。
2.特征工程:特征提取、特征转换、特征选择以及降维。
3.管道:构造、评估和调整的管道的工具。
4.存储:保存和加载算法、模型及管道
5.实用工具:线性代数,统计,数据处理等。

具体的代码直接参照官网的教程即可,非常详细,只需要跟着写就可以了。

由于日志太多,只能看到部分结果。


官网教程
  • 分类
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.SparkSession

object classfier {

  def main(args: Array[String]) {
    // Load the data stored in LIBSVM format as a DataFrame.
    val spark=SparkSession.builder
      .master("local[4]")
      .appName("classifier")
      .getOrCreate()
    val data = spark.read.format("libsvm").load("data/sample_multiclass_classification_data.txt")

    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(data)
    // Automatically identify categorical features, and index them.
    val featureIndexer = new VectorIndexer()
      .setInputCol("features")
      .setOutputCol("indexedFeatures")
      .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
      .fit(data)

    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

    // Train a DecisionTree model.
    val dt = new DecisionTreeClassifier()
      .setLabelCol("indexedLabel")
      .setFeaturesCol("indexedFeatures")

    // Convert indexed labels back to original labels.
    val labelConverter = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predictedLabel")
      .setLabels(labelIndexer.labels)

    // Chain indexers and tree in a Pipeline.
    val pipeline = new Pipeline()
      .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))

    // Train model. This also runs the indexers.
    val model = pipeline.fit(trainingData)

    // Make predictions.
    val predictions = model.transform(testData)

    // Select example rows to display.
    predictions.select("predictedLabel", "label", "features").show(5)

    // Select (prediction, true label) and compute test error.
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("indexedLabel")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test Error = ${(1.0 - accuracy)}")

    val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
    println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
  }

}

  • 聚类
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

object clustering {

  def main(args: Array[String]) {
    val sc = new SparkContext("local[4]","WordCount")
    // Load and parse the data
    val data = sc.textFile("data/Kmeans_data.txt")
    val parsedData = data.map(s => Vectors.dense(s.split(',').map(_.toDouble))).cache()

    // Cluster the data into two classes using KMeans
    val numClusters = 6
    val numIterations = 20
    val clusters = KMeans.train(parsedData, numClusters, numIterations)

    // Evaluate clustering by computing Within Set Sum of Squared Errors
    val WSSSE = clusters.computeCost(parsedData)
    println(s"Within Set Sum of Squared Errors = $WSSSE")

    // Save and load model
    //clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
    //val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")

  }
}
  • 回归
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.regression.LinearRegression

object linear {

  def main(args: Array[String]) {
    // Load training data
    val spark=SparkSession.builder
      .master("local[4]")
      .appName("linear")
      .getOrCreate()

    val training = spark.read.format("libsvm")
      .load("data/sample_linear_regression_data.txt")

    val lr = new LinearRegression()
      .setMaxIter(10)
      .setRegParam(0.3)
      .setElasticNetParam(0.8)

    // Fit the model
    val lrModel = lr.fit(training)

    // Print the coefficients and intercept for linear regression
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // Summarize the model over the training set and print out some metrics
    val trainingSummary = lrModel.summary
    println(s"numIterations: ${trainingSummary.totalIterations}")
    println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
    trainingSummary.residuals.show()
    println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
    println(s"r2: ${trainingSummary.r2}")
  }

}

相关文章

网友评论

      本文标题:Spark应用学习笔记

      本文链接:https://www.haomeiwen.com/subject/astgiftx.html