美文网首页
spark之rdd

spark之rdd

作者: 虚心若愚_b5c1 | 来源:发表于2020-01-08 15:22 被阅读0次

RDD的创建

从集合中创建

  1. makeRDD函数创建
    // 设定spark计算框架的运行环境
    val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    // 创建spark上下文
    val sc = new SparkContext(config)

    // 从集合中调用makeRDD创建RDD
    val list = sc.makeRDD(List(1,2,3,4))

    // 打印结果
    list.collect().foreach(println)
  1. parallelize函数创建
    // 设定spark计算框架的运行环境
    val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    // 创建spark上下文
    val sc = new SparkContext(config)

    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(List(1,2,3,4))

    // 打印结果
    list.collect().foreach(println)

从外部存储中创建

    // 读取文件,将文件内容一行一行读取出来
    // 默认情况下可以读取项目路径,也可以读取其他路径,HDFS
    // 默认从文件读取的数据都是字符串类型
    // 读取文件时,传递的分区参数为最小分区数,但不一定是这个分区数,取决于hadoop文件的时分片规则
    val lines : RDD[String] = sc.textFile("in",2)

从其他RDD创建

RDD的转换

value类型

  1. map(func)的案例
    作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(1 to 10)

    // 给每个元素乘以2组成新值
    val newValue = list.map(x => x*2)

    newValue.foreach(println)
  1. flatmap(func)的案例
    作用:类似于map,但是每一个输入元素可以被映射为0个或者多个输出元素(所以func应返回一个序列,而不是单一元素)
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(Array(List(1,2), List(3, 4)))

    // flatmap 
    // 1,2,3,4
    val result = list.flatMap(list => list)

    result.collect().foreach(println)
  1. groupBy(func)的案例
    作用:分组,按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(1 to 10)
    
    val groupDatas = list.groupBy(x => x%2)

    groupDatas.collect().foreach(println)
  1. filter(func)的案例
    作用:过滤,返回一个新的RDD,该RDD由经过func函数计算后返回为true的输入元素组成
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(1 to 10)

    val filterNums = list.filter(x => x%2==0)

    filterNums.collect().foreach(println)
  1. distinct([numTasks])的案例
    作用:对源RDD进行去重后返回一个新的RDD,默认情况下,只有8个并行任务来操作,但是可以传入一个numTasks参数改变它
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(List(1,2,3,4,1,2))

    // 去重
    list.distinct().collect().foreach(println)
  1. sortBy(func,[ascending],[numTasks])的案例
    作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为升序
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(List(1,2,3,4,1,2))

    // 降序
    val result = list.sortBy(x => x,  false)

    result.collect().foreach(println)

双value类型

  1. union(otherDataset)的案例
    作用:对源RDD和参数RDD求并集后返回一个新的RDD
    // 从集合中调用makeRDD创建RDD
    val list1 = sc.parallelize(1 to 10)

    // 集合2
    val list2 = sc.parallelize(10 to 15)

    // 并集
    val lists = list1.union(list2).distinct().collect()

    lists.foreach(println)
  1. substract(otherDataset)的案例
    作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将被保留下来
    // 从集合中调用makeRDD创建RDD
    val list1 = sc.parallelize(1 to 10)

    // 集合2
    val list2 = sc.parallelize(10 to 15)

    val result = list1.subtract(list2)

    result.collect().foreach(println)

  1. intersection(otherDataset)的案例
    作用:对源RDD和参数RDD求交集饭后一个新的RDD
    // 从集合中调用makeRDD创建RDD
    val list1 = sc.parallelize(1 to 10)

    // 集合2
    val list2 = sc.parallelize(10 to 15)

    val result = list1.intersection(list2)

    result.collect().foreach(println)
  1. cartesian(otherDataset)的案例
    作用:笛卡尔积
    // 从集合中调用makeRDD创建RDD
    val list1 = sc.parallelize(1 to 10)

    // 集合2
    val list2 = sc.parallelize(10 to 15)

    val result = list1.cartesian(list2)

    result.collect().foreach(println)

  1. zip(otherDataset)的案例
    作用:将两个RDD组合成key/value形式的RDD,这里默认两个RDD的partition数量以及元素数量相同,否则会抛出异常
    // 从集合中调用makeRDD创建RDD
    val list1 = sc.parallelize(Array("x","y","z"), 2)

    // 集合2
    val list2 = sc.parallelize(Array(1,2,3), 2)

    val result = list1.zip(list2)
    // (x,1) (y,2) (z,3)
    result.collect().foreach(println)

key-value类型

  1. groupByKey的案例
    作用:groupByKey也是对每个key进行分组,但只生成一个sequence
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(Array("hello", "hello", "world"))

    // 组装成map
    val map = list.map(x => (x, 1))

    val group = map.groupByKey()

    group.collect().foreach(println)
  1. reduceByKey(func,[numTasks])案例
    作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同的key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数设置
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(Array("hello", "hello", "world"))

    // 组装成map
    val map = list.map(x => (x, 1))

    val group = map.reduceByKey(_+_)

    group.collect().foreach(println)
  1. aggregateByKey的案例
    参数:(zeroValue:U,)
  1. sortByKey([ascending],[numTasks])的案例
    作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
   // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(Array((1,"a"),(5, "b"),(3,"c")))

    // 降序
    val result = list.sortByKey( false)
    
    // (5,b) (3,c) (1,a)
    result.collect().foreach(println)
  1. mapValues的案例
    针对于(K,V)的形式的类型只对V进行操作
    // 从集合中调用makeRDD创建RDD
    val list = sc.parallelize(Array((1,"a"),(5, "b"),(3,"c")))

    // 降序
    val result = list.mapValues(x => x.concat("|||"))

    result.collect().foreach(println)
  1. join(otherDataset,[numTasks])的案例
    作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
    // 从集合中调用makeRDD创建RDD
    val list1 = sc.parallelize(Array((1,"a"),(5, "b"),(3,"c")))

    val list2 = sc.parallelize(Array((1,"f"),(5, "g"),(4,"h")))

    val result = list1.join(list2)

    //(1,(a,f)) (5,(b,g))
    result.collect().foreach(println)

action

  1. reduce(func)的案例
    作用:通过func函数聚集RDD中的所有元素,先聚合分区内的数据,再聚合分区间的数据
    // 从集合中调用makeRDD创建RDD
    val list = sc.makeRDD(1 to 10, 2)

    // 聚合RDD[int]中所有元素
    val sum = list.reduce(_+_)

    // 55
    print(sum)
  1. collect()案例
    作用:在驱动程序中,以数组的形式返回数据集的所有元素
    val result = sc.makeRDD(Array(1 to 10))

    result.collect()

3.count()案例
作用:返回RDD中元素的个数

  val result = sc.makeRDD(1 to 10)

  // 统计个数为10
  print(result.count())
  1. first()案例
    作用:返回RDD中的第一个元素
  2. tabke(n)案例
    作用:返回一个由RDD的前n个元素组成的数组
  3. takeOrdered(n)
    作用:返回该RDD排序后的前n个元素组成的数组
  4. countByKey()
    作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的个数
    val map = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 1)))

    val result = map.countByKey()

    println(result)
  1. foreach(func)的案例
    作用:在数据集的每一个元素上,运行函数fucn进行更新

数据的读取和保存

文件数据读取与保存

  1. text文件

数据读取:text(String)

// 读取文件,将文件内容一行一行读取出来
// 默认情况下可以读取项目路径,也可以读取其他路径,HDFS
// 默认从文件读取的数据都是字符串类型
val lines : RDD[String] = sc.textFile("in")

数据保存:saveAsTextFile

list.saveAsTextFile("output")
  1. json文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析

// 读取json文件
val file = sc.textFile("in/user.json")

val json = file.map(JSON.parseFull)

json.foreach(println)

user.json(注意:每行不能用逗号隔开)

{"name": "xuwei", "age": 10}
{"name": "xuwei2", "age": 101}
{"name": "xuwei2", "age": 103}

输出结果

Some(Map(name -> xuwei2, age -> 103.0))
Some(Map(name -> xuwei, age -> 10.0))
Some(Map(name -> xuwei2, age -> 101.0))

文件系统类数据读取与保存

  1. mysql读取与保存

mysql读取
支持通过Java jdbc访问关系数据库,需要通过JdbcRDD进行
(1) 添加依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.18</version>
</dependency>

(2) mysql读取

    // 设定spark计算框架的运行环境
    val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    // 创建spark上下文
    val sc = new SparkContext(config)

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://192.168.6.70:3306/watson"
    val userName = "watson"
    val password = "watson@150922"

    val sql = "select * from user where userid >= ? and userid<= ?"
    val jdbc = new JdbcRDD(
      sc,
      () => {
        // 获取数据库的连接对象
        //Class.forName(driver)
        java.sql.DriverManager.getConnection(url, userName, password)
      },
      sql,
      1,
      3,
      2,
      (rs)=>{
        println(rs.getString(1)+","+rs.getString(2))
      }

    )
    jdbc.collect()
  }

(3) mysql保存

 // 设定spark计算框架的运行环境
    val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    // 创建spark上下文
    val sc = new SparkContext(config)

    val data = sc.makeRDD(List(("xuwei", "test")))

    val url = "jdbc:mysql://192.168.6.70:3306/watson"
    val userName = "watson"
    val password = "watson@150922"

    data.foreachPartition(datas =>{
      val connection = java.sql.DriverManager.getConnection(url, userName, password)
      datas.foreach{
        case (userName, password)=>{
          val sql = "insert into user(username, password) values (?,?)"
          val statement = connection.prepareStatement(sql)
          statement.setString(1, userName)
          statement.setString(2, password)
          statement.executeUpdate()

          statement.close()
        }
      }
      connection.close()
    })

  }
  1. hbase读取与保存

相关文章

  • Spark RDD Api使用指南

    ​ 在Spark快速入门-RDD文章中学了spark的RDD。spark包含转换和行动操作。在进行spark程...

  • Spark源码之DAGScheduler

    Spark源码之DAGScheduler介绍篇 Spark Application中的RDD经过一系列的Trans...

  • 2 通过案例对SparkStreaming透彻理解之二

    Spark Core是基于RDD形成的,RDD之间都会有依赖关系。而Spark Streaming是在RDD之上增...

  • Spark Scheduler内部原理剖析

    通过文章“Spark核心概念RDD”我们知道,Spark的核心是根据RDD来实现的,Spark Scheduler...

  • Spark 之RDD

    为什么要设计RDD 网上资料很多,这里我给罗列出来,许多的迭代算法和交互式数据挖掘工具,这些应用场景的共同点是:...

  • spark之rdd

    RDD的创建 从集合中创建 makeRDD函数创建 parallelize函数创建 从外部存储中创建 从其他RDD...

  • Spark之RDD

    最近在学习Spark, 网上搜集了一些学习文章,把便于自己理解的揉杂在一起,方便自己学习回顾。 RDD概念 RDD...

  • Spark之Rdd

    注:以下代码scala版本采用 2.11.12 spark的版本采用spark-2.4.5-bin-hadoop2...

  • Spark Core - 编程基础

    RDD编程 什么是RDD RDD是Spark的基石,是实现Spark数据处理的核心抽象。RDD是一个抽象类,它代表...

  • Spark中对RDD的理解

    Spark中对RDD的理解 简介 what the is RDD? RDD(Resilient Distribut...

网友评论

      本文标题:spark之rdd

      本文链接:https://www.haomeiwen.com/subject/wwrsactx.html