美文网首页
Spark学习(九):分布式矩阵

Spark学习(九):分布式矩阵

作者: CocoMama190227 | 来源:发表于2019-03-20 16:37 被阅读0次

十余天没有学习Spark了,不是我在偷懒,而是前段时间一直在研究形态学算法提取波形的问题。现在算法差不多搞定啦,但是用Python开发的,有空有能力时再补上Spark的程序。还是先来记录一下分布式矩阵的用法吧~

一般采用分布式矩阵进行存储都在数据量非常大的情况下进行,处理速度和效率与其存储格式息息相关。MLlib提供了四种分布式矩阵存储形式,分别为:行矩阵,带有行索引的行矩阵,坐标矩阵和块矩阵,据说分块矩阵并不常用。

行矩阵

行矩阵以行作为基本的矩阵存储格式,每一行的内容都可以单独取出来进行操作,列的作用相较小。

带索引的行矩阵

为了方便在系统调试的过程中对行矩阵的内容进行观察和显示,MLlib提供了带索引的行矩阵。

坐标矩阵

坐标矩阵是一种带有坐标标记的矩阵,其中的每一个具体数据都有一组坐标进行标示,类型格式如下:
(x: Long, y: Long, values: Double)

分块矩阵

顾名思义,就是将矩阵分块(好废话哦)。分块矩阵可由带索引的行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换。

例程

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.{Matrix, Matrices, Vectors, Vector}
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, IndexedRowMatrix, IndexedRow, CoordinateMatrix, MatrixEntry}

object RDDMatrix {
  def main(args: Array[String]): Unit = {
    println("--------------------------本地矩阵-------------------------------")
    val mx = Matrices.dense(2, 3, Array(1, 2, 3, 4, 5, 6)) // 创建2行3列的本地矩阵,Matrices.dense是矩阵重组的调用方法
    println(mx)

    println("--------------------------分布式行矩阵------------------------------")
    val conf = new SparkConf().setAppName("Distributed matrix").setMaster("local")
    val sc = new SparkContext(conf)
    val path = "F:/ScalaProject/test/collaborativeFilter/src/main/resources/Kmeans.txt"
    val rdd = sc.textFile(path).map(_.split(" ").map(_.toDouble)) // 转化成Double类型的向量存储
    val rdd1 = rdd.map(line => Vectors.dense(line))  // 转换成向量存储
    val rm = new RowMatrix(rdd1) // 读入行矩阵
    // 如果打印rm中的具体内容,结果显示是数据的内存地址。这表明RowMatrix只是一个转化操作,并不运行最终结果。
    println(rm.numRows())         // 打印行数
    println(rm.numCols())         // 打印列数

    println("--------------------------带索引的行矩阵---------------------------")
    val rdd2 = rdd1.map(vd => new IndexedRow(vd.size, vd)) // 转化格式
    val irm = new IndexedRowMatrix(rdd2)  // 建立索引行矩阵实例
    println(irm.getClass) // 打印类型
    irm.rows.foreach(println)// 打印内容数据

    println("---------------------------坐标矩阵--------------------------------")
    val rdd3 = rdd.map(vue => (vue(0).toLong, vue(1).toLong, vue(2))).  // 转化成坐标格式
      map(vue2 => new MatrixEntry(vue2._1, vue2._2, vue2._3))           // 转化成坐标矩阵格式
    // vue(0)和vue(1)分别是行和列坐标的坐标轴标号,vue(2)是具体内容
    // ._1 和 ._2 是scala语句中元组参数的序数专用标号,分别是传入第二个和第三个值
    val crm = new CoordinateMatrix(rdd3)  // 直接打印CoordinateMatrix实例的对象也仅仅是内存地址
    crm.entries.foreach(println)


    println("--------------------------分块矩阵---------------------------")
    // 将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入
    val matA = irm.toBlockMatrix(2,2).cache()
    // 查看其分块情况
    matA.blocks.collect.foreach(println)
    println(matA.numColBlocks)
    println(matA.numRowBlocks)
  }
}

原始数据:

1 2 2
1 1 1
1 3 3
2 2 2
3 4 5
4 3 3
2 2 2
4 4 1

运行结果:

--------------------------本地矩阵-------------------------------
1.0  3.0  5.0  
2.0  4.0  6.0  
--------------------------分布式行矩阵------------------------------
8
3
--------------------------带索引的行矩阵---------------------------
class org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
IndexedRow(3,[1.0,2.0,2.0])
IndexedRow(3,[1.0,1.0,1.0])
IndexedRow(3,[1.0,3.0,3.0])
IndexedRow(3,[2.0,2.0,2.0])
IndexedRow(3,[3.0,4.0,5.0])
IndexedRow(3,[4.0,3.0,3.0])
IndexedRow(3,[2.0,2.0,2.0])
IndexedRow(3,[4.0,4.0,1.0])
---------------------------坐标矩阵--------------------------------
MatrixEntry(1,2,2.0)
MatrixEntry(1,1,1.0)
MatrixEntry(1,3,3.0)
MatrixEntry(2,2,2.0)
MatrixEntry(3,4,5.0)
MatrixEntry(4,3,3.0)
MatrixEntry(2,2,2.0)
MatrixEntry(4,4,1.0)
--------------------------分块矩阵---------------------------
((1,1),2 x 1 CSCMatrix
(1,0) 19.0)
((1,0),2 x 2 CSCMatrix
(1,0) 18.0
(1,1) 21.0)
2
2

至于分块矩阵为什么会输出这么奇怪的结果,还没有研究明白,等搞明白再补上吧!

相关文章

网友评论

      本文标题:Spark学习(九):分布式矩阵

      本文链接:https://www.haomeiwen.com/subject/msgzuqtx.html