美文网首页
Spark学习(九):分布式矩阵

Spark学习(九):分布式矩阵

作者: CocoMama190227 | 来源:发表于2019-03-20 16:37 被阅读0次

    十余天没有学习Spark了,不是我在偷懒,而是前段时间一直在研究形态学算法提取波形的问题。现在算法差不多搞定啦,但是用Python开发的,有空有能力时再补上Spark的程序。还是先来记录一下分布式矩阵的用法吧~

    一般采用分布式矩阵进行存储都在数据量非常大的情况下进行,处理速度和效率与其存储格式息息相关。MLlib提供了四种分布式矩阵存储形式,分别为:行矩阵,带有行索引的行矩阵,坐标矩阵和块矩阵,据说分块矩阵并不常用。

    行矩阵

    行矩阵以行作为基本的矩阵存储格式,每一行的内容都可以单独取出来进行操作,列的作用相较小。

    带索引的行矩阵

    为了方便在系统调试的过程中对行矩阵的内容进行观察和显示,MLlib提供了带索引的行矩阵。

    坐标矩阵

    坐标矩阵是一种带有坐标标记的矩阵,其中的每一个具体数据都有一组坐标进行标示,类型格式如下:
    (x: Long, y: Long, values: Double)

    分块矩阵

    顾名思义,就是将矩阵分块(好废话哦)。分块矩阵可由带索引的行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换。

    例程

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.mllib.linalg.{Matrix, Matrices, Vectors, Vector}
    import org.apache.spark.mllib.linalg.distributed.{RowMatrix, IndexedRowMatrix, IndexedRow, CoordinateMatrix, MatrixEntry}
    
    object RDDMatrix {
      def main(args: Array[String]): Unit = {
        println("--------------------------本地矩阵-------------------------------")
        val mx = Matrices.dense(2, 3, Array(1, 2, 3, 4, 5, 6)) // 创建2行3列的本地矩阵,Matrices.dense是矩阵重组的调用方法
        println(mx)
    
        println("--------------------------分布式行矩阵------------------------------")
        val conf = new SparkConf().setAppName("Distributed matrix").setMaster("local")
        val sc = new SparkContext(conf)
        val path = "F:/ScalaProject/test/collaborativeFilter/src/main/resources/Kmeans.txt"
        val rdd = sc.textFile(path).map(_.split(" ").map(_.toDouble)) // 转化成Double类型的向量存储
        val rdd1 = rdd.map(line => Vectors.dense(line))  // 转换成向量存储
        val rm = new RowMatrix(rdd1) // 读入行矩阵
        // 如果打印rm中的具体内容,结果显示是数据的内存地址。这表明RowMatrix只是一个转化操作,并不运行最终结果。
        println(rm.numRows())         // 打印行数
        println(rm.numCols())         // 打印列数
    
        println("--------------------------带索引的行矩阵---------------------------")
        val rdd2 = rdd1.map(vd => new IndexedRow(vd.size, vd)) // 转化格式
        val irm = new IndexedRowMatrix(rdd2)  // 建立索引行矩阵实例
        println(irm.getClass) // 打印类型
        irm.rows.foreach(println)// 打印内容数据
    
        println("---------------------------坐标矩阵--------------------------------")
        val rdd3 = rdd.map(vue => (vue(0).toLong, vue(1).toLong, vue(2))).  // 转化成坐标格式
          map(vue2 => new MatrixEntry(vue2._1, vue2._2, vue2._3))           // 转化成坐标矩阵格式
        // vue(0)和vue(1)分别是行和列坐标的坐标轴标号,vue(2)是具体内容
        // ._1 和 ._2 是scala语句中元组参数的序数专用标号,分别是传入第二个和第三个值
        val crm = new CoordinateMatrix(rdd3)  // 直接打印CoordinateMatrix实例的对象也仅仅是内存地址
        crm.entries.foreach(println)
    
    
        println("--------------------------分块矩阵---------------------------")
        // 将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入
        val matA = irm.toBlockMatrix(2,2).cache()
        // 查看其分块情况
        matA.blocks.collect.foreach(println)
        println(matA.numColBlocks)
        println(matA.numRowBlocks)
      }
    }
    

    原始数据:

    1 2 2
    1 1 1
    1 3 3
    2 2 2
    3 4 5
    4 3 3
    2 2 2
    4 4 1
    

    运行结果:

    --------------------------本地矩阵-------------------------------
    1.0  3.0  5.0  
    2.0  4.0  6.0  
    --------------------------分布式行矩阵------------------------------
    8
    3
    --------------------------带索引的行矩阵---------------------------
    class org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
    IndexedRow(3,[1.0,2.0,2.0])
    IndexedRow(3,[1.0,1.0,1.0])
    IndexedRow(3,[1.0,3.0,3.0])
    IndexedRow(3,[2.0,2.0,2.0])
    IndexedRow(3,[3.0,4.0,5.0])
    IndexedRow(3,[4.0,3.0,3.0])
    IndexedRow(3,[2.0,2.0,2.0])
    IndexedRow(3,[4.0,4.0,1.0])
    ---------------------------坐标矩阵--------------------------------
    MatrixEntry(1,2,2.0)
    MatrixEntry(1,1,1.0)
    MatrixEntry(1,3,3.0)
    MatrixEntry(2,2,2.0)
    MatrixEntry(3,4,5.0)
    MatrixEntry(4,3,3.0)
    MatrixEntry(2,2,2.0)
    MatrixEntry(4,4,1.0)
    --------------------------分块矩阵---------------------------
    ((1,1),2 x 1 CSCMatrix
    (1,0) 19.0)
    ((1,0),2 x 2 CSCMatrix
    (1,0) 18.0
    (1,1) 21.0)
    2
    2
    

    至于分块矩阵为什么会输出这么奇怪的结果,还没有研究明白,等搞明白再补上吧!

    相关文章

      网友评论

          本文标题:Spark学习(九):分布式矩阵

          本文链接:https://www.haomeiwen.com/subject/msgzuqtx.html