美文网首页
【2019-06-23】RDD编程

【2019-06-23】RDD编程

作者: BigBigFlower | 来源:发表于2019-06-23 15:34 被阅读0次
image.png

(1)弹性分布数据集

RDD以分区的形式分布在集群中多个机器上,每个分区代表了数据集的一个子集。分区定义了spark中数据的并行单位。Spark 框架并行处理多个分区,一个 分区内的数据对象则是顺序处理。创建 RDD 最简单的方法是在本地对象集合上调用 SparkContext 的 parallelize 方法。

(2)RDD基础

#在python中使用textFile创建一个字符串的RDD
'''
测试文本abc.txt
Jimmy   A   1
Jimmy   B   8
Sam C   10
Tom A   7
'''
lines=sc.textFile("/user/test/abc.txt")
pythonlines=lines.filter(lambda line:"Jimmy" in line)
 pythonlines.first()
#u'Jimmy\tA\t1'

spark或shell 会话都按照如下方式工作:
(i)从外部数据创建出输入RDD
(ii)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
(iii)告诉spark对需要被重用的中间结果RDD执行persist()操作。
(iv)使用行动操作来触发一次并行计算,spark会对计算进行优化后再执行。

(3)创建RDD

#python 中的parallelize()方法
lines=sc.parallelize(["pandas","i like pands"])

(4)RDD操作
RDD 操作两种类型:转化操作和行动操作

# filter 转化操作
lines=sc.textFile("/user/test/abc.txt")
flines=lines.filter(lambda x:"Sam" in x)

# union() 转化操作
glines=lines.filter(lambda x:"Tom" in x)
linesunion=flines.union(glines)

# 行动操作,计数
print linesunion.count()
# 2

(5)向spark传递函数

# 在python中传递函数 
word=lines.filter(lambda s:"Tom" in s)
def containsTom(s):
  return "Tom" in s
word=lines.filter(containsTom)

在scala中,可以把定义的内联函数、方法的引用或静态方法传递给spark。

#scala 中传递函数
import org.apache.spark.rdd.RDD
class SearchFunction(val query:String){
      def isMatch(s:String):Boolean={
      s.contains(query)
      }
      def getMatchedNoReference(rdd:RDD[String]):RDD[String]={
      val query_ = this.query
     rdd.flatMap(x => x.split(query_))
    }
}

(4)常见的转化操作和行动操作
基本RDD
--针对各个元素的转化操作
最常用的转化操作是map()和filter()。转化操作map()接收一个函数,把这个函数用于RDD的每个元素,将函数的返回结果作为结果RDD中的对应元素的值。

map

# python 中计算RDD中各个值的平方
nums=sc.parallelize([1,2,3,5])
squared=nums.map(lambda x:x*x).collect()
for num in squared:
      print "%i" %(num)
'''
1
4
9
25
'''
// scala中计算RDD中各个值的平方
val input=sc.parallelize(List(1,2,3,4))
val result=input.map(x=>x*x)
println(result.collect().mkString(","))
// 1,4,9,16  

flatMap

#python 中的flatMap()将行数据切分为单词
lines = sc.parallelize(["hello world","hi"])
words=lines.flatMap(lambda line:line.split(" "))
words.first()
# hello

//scala 中的flatMap()中将行数据切分为单词
val lines=sc.parallelize(List("hello world","hi"))
val words=lines.flatMap(line=>line.split(" "))
words.first()
//res1: String = hello

伪集合操作
最简单的集合操作是union(other),返回一个包含两个RDD中所有元素的RDD。RDD.distinct()转化操作可以生成一个包含不同元素的新RDD,distinct开销很大,需要将所有数据通过网络进行混洗(shuffle)。
spark 提供了intersection(other)方法,只返回两个RDD中都有的元素。intersection会在运行时去掉所有重复的元素(单个RDD内重复的元素也会一起移除),intersection()和union()的概念类似,但是intersection性能差很多,它需要通过网络混洗数据来发现共有的元素。
计算笛卡尔积 cartesian(other)转化操作会返回所有可能的(a,b)对。
单个RDD操作
map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD,rdd.map(x=>x+1)
flatMap()将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,通常用来切分单词。rdd.flatMap(x=>x.to(3))
filter() 返回一个由通过传给filter()的函数的元素组成吃的RDD。rdd.filter(x=>x!=1)
distinct()去重 rdd.distinct()
sample(withReplacement,fraction,[seed]) 采样,是否替换。 rdd.sample(false,0.5)

两个RDD操作
union 生成一个包含两个RDD中所有元素的RDD。rdd.union(other)
intersection() 求两个RDD共同的元素的RDD。rdd.intersection(other)
subtract() 移除一个RDD中的内容。rdd.subtract()
cartesian() 与另一个RDD的笛卡尔积。 rdd.cartesian(other)

行动操作
基本的RDD最长见的行动操作reduce()。接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD数据并返回一个同样类型的新元素。

#python 中的reduce()
sum=rdd.reduce(lambda x,y:x+y)


//scala中的reduce()
val sum=rdd.reduce((x,y)=>x+y)

fold和reduce类似,接收一个与reduce()接收的函数签名相同的函数,再加上一个初始值来作为每个分区第一次调用时的结果。
aggregate()函数把从返回值类型必须与所操作的RDD类型相同的限制中解放出来。

#python 中的aggregate()
nums=sc.parallelize([1,2,3,4])
seq0p=(lambda x,y:(x[0]+y,x[1]+1))
comb0p=(lambda x,y:(x[0]+y[0],x[1]+y[1]))
nums.aggregate((0,0),seq0p,comb0p)
//(10, 4)



//scala中的aggregate()
val result=input.aggregate((0,0))(
(acc,value)=>(acc._1+value,acc._2+1),
(acc1,acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2)
)

基本的RDD行动操作
collect() 返回RDD中的所有元素。rdd.collect()
count() RDD中的元素个数。 rdd.count()
countByValue() 各元素在RDD中出现的次数。rdd.countByValue()
take(num) 从RDD中返回num个元素。rdd.take(num)
top(num) 从RDD中按照提供的顺序返回最前面的num个元素。rdd.top()
takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素。rdd.takeOrdered(num)
takeSample(withReplacement,num,[seed]) 从RDD中返回一些任意元素。
reduce(func) 并行整合RDD中所有数据。rdd.reduce((x,y)=>x+y)
fold(zero)(func) 和reduce一样,但是需要初始值。rdd.fold(0)((x,y)=>x+y)
aggregate(zerovalue)(seq0p,comb0p)和reduce相似,但是通常返回不同类型的函数。rdd.aggregate((0,0))((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2)
foreach(func) 对RDD中的每个元素使用给定的函数。rdd.foreach(func)

(5)持久化(缓存)
SparkRDD是惰性求值的。有时候希望能多次使用同一个RDD,可以给RDD选择不同的持久化级别。在Scala和Java中,默认情况下,persist()会把数据以序列化的形式缓存在JVM的堆空间中,在python中,会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列话后的对象存储在JVM的堆空间中。

持久化级别
//scala中使用persist()
import org.apache.spark.storage.StorageLevel
val result=input.map(x=>x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
)

相关文章

  • 【2019-06-23】RDD编程

    (1)弹性分布数据集 RDD以分区的形式分布在集群中多个机器上,每个分区代表了数据集的一个子集。分区定义了spar...

  • spark(六)深入理解spark-core:RDD的原理与源码

    一.弹性分布式数据集(RDD) 本部分描述RDD和编程模型,首先讨论设计目标,然后定义RDD,讨论Spark的编程...

  • spark开发笔记(二、RDD编程笔记)

    RDD编程 RDD的基本概念 Spark编程模型是弹性分布式数据集(Resilient Distributed D...

  • PySpark-数据操作-RDD

    更多信息https://blue-shadow.top/ RDD 编程基础 相关概念 pair RDD ...

  • Spark系列2 - Spark RDD编程

    一、RDD 编程基础 1 RDD 创建 从文件系统创建RDD textFile textFile参数说明:若参数是...

  • 从零开始学习Spark(三)RDD编程

    RDD编程 RDD (Resilient Distributed Dataset 弹性分布式数据集)是Spark中...

  • RDD编程

    RDD基础 RDD:Resilient Distributed Datasets,弹性分布式数据集 分布在集群中的...

  • RDD编程

    1.读取数据,将RDD持久化到内存中,并进行行动操作。 2.一般常用的是从外部存储中读取来创建RDD,如Spark...

  • RDD编程

    RDD基础概念: RDD:弹性分布式数据集(Resilient Distributed Dataset),spar...

  • Spark Core - 编程基础

    RDD编程 什么是RDD RDD是Spark的基石,是实现Spark数据处理的核心抽象。RDD是一个抽象类,它代表...

网友评论

      本文标题:【2019-06-23】RDD编程

      本文链接:https://www.haomeiwen.com/subject/olnyqctx.html