创建RDD
inputRDD = sc.textFile("log.txt")
lines = sc.parallelize(["pandas", "i like pandas"]) //用于测试
RDD操作
转化操作
产生新的RDD的操作,转化操作是惰性求值的,只有遇到行动操作时,才真正执行。
行动操作
对RDD进行实际的计算。
常见的RDD操作
转化操作
常见的转化操作包含map、flatMap、filter操作,也包含distinct、union、intersection、subtract、cartesian等伪集合操作。
操作 | 解释 |
---|---|
map | 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD 中对应元素的值 |
filter | 接收一个函数,并将 RDD 中满足该函数的 元素放入新的 RDD 中返回 |
flatmap | 接收一个函数被分别应用到了输入 RDD 的每个元素上。不 过返回的不是一个元素,而是一个返回值序列的迭代器。 |
distinct | 生成一个只包含不同元素的新 RDD |
union | 接收另一个 RDD 作为参数,返回一个包含两个 RDD 中所有元素的 RDD(包含重复的) |
intersection | 接收另一个 RDD 作为参数,返回两个 RDD 中都有的元素。 |
subtract | 接收另一个 RDD 作为参数,返回 一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD。 |
cartesian | 接收另一个 RDD 作为参数,会返回 所有可能的 (a, b) 对,其中 a 是源 RDD 中的元素,而 b 则来自另一个 RDD。 |
例如:a.cartesian(b) => (a1,b1), (a1,b2), (a1,b3), (a2,b1), (a2,b2), (a2,b3)
注意:distinct、intersection、substract等操作会发生混洗(shuffle),影响性能。
行动操作
操作 | 解释 |
---|---|
reduce | 接收一个函数作为参数,这个 函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素。 |
fold | 接收一个与 reduce() 接收的函数签名相同的函数,再加上一个 “初始值”来作为每个分区第一次调用时的结果。 |
aggregate | 把我们从返回值类型必须与所操作的 RDD 类型相同的限制中解放出 来。与 fold() 类似,使用 aggregate() 时,需要提供我们期待返回的类型的初始值。然后 通过一个函数把 RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加 的,最终,还需要提供第二个函数来将累加器两两合并。 |
collect | 返回 RDD 中的所有元素 |
top(num) | 从 RDD 中返回最前面的 num 个元素 |
count | RDD 中的元素个数 |
countByValue | 各元素在 RDD 中出现的次数 |
take(num) | 从 RDD 中返回 num 个元素 |
takeOrdered | 从 RDD 中按照提供的顺序返 回最前面的 num 个元素 |
takeSample | 从 RDD 中返回任意一些元素 |
foreach | 对 RDD 中的每个元素使用给 定的函数 |
rdd.fold(0)((x, y) => x + y)
sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])
持久化
调用 persist() 方法进行持久化,persist() 调用本身不会触发强制求值。RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除。
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
网友评论