Spark08 RDD KV数据

作者: 山高月更阔 | 来源:发表于2020-05-31 16:29 被阅读0次

    键值对 RDD 是 Spark 中许多操作所需要的常见数据类型。Spark 为包含简直对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为 pair RDD。

    创建 Pair RDD

    在 Spark 中有很多种方法创建 Pair RDD。很多存储键值对的数据格式会在读取值时直接返回其键值对数据组成的 pair RDD。普通的 RDD 可通过 map 转换为 pair RDD。

    构建键值对 RDD 的方法在不同语言中会有所不同。在 Python 中,需要返回一个二元组组成的 RDD

    >>> rdd=sc.parallelize(['zhang 1','li 2'])
    >>> pairs = rdd.map(lambda x : (x.split(' ')[0],x.split(' ')[1]))
    >>> pairs.collect()
    [('zhang', '1'), ('li', '2')]
    

    Pair RDD 的转换操作

    以键值对集合[(1,2),(3,4),(3,6)] 为例

    函数名 目的 实例 结果
                                     
    reduceByKey                                                                       
    合并具有相同键的值 rdd.reduceByKey(lambda x,y : x + y) [('3', '46'), ('1', '2')]
    groupByKey 对具有相同键值进行分组 rdd.groupByKey() [(1,[2]),(3,[4,6])]
    combineByKey 使用不同的返回类型合并具有相同的键的值
    mapValues(func) 对 pari RDD 的每个值应用一个函数而不改变键 rdd.mapValues(lambda x : x + 1) [(1, 3), (3, 5), (3, 7)]
    flatMapValues 对 Pair RDD 中的每个值应用一个返回迭代器的函数,然后对每个元素都生成一个对应原键的键值对记录。 rdd.flatMapValues(lambda x : list(range(int(x)))) [('1', 0), ('1', 1), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 4), ('3', 5)]
    keys 返回一个仅包含键的 RDD rdd.keys() ['1', '3', '3']
    values 返回一个仅包含值的 RDD rdd.values() ['2', '4', '6']
    sortByKey 返回一个根据键排序的 RDD rdd.sortByKey() [('1', '2'), ('3', '4'), ('3', '6')]

    针对两个 pair RDD 的转化操作

    rdd = [(1,2),(3,4),(3,6)]
    other = [(3,9)]

    函数名 目的 实例 结果
                                     
    subtractByKey                                                                        
    删除 rdd 中键与 other 相同的键相同的元素 rdd.subtractByKey(other) [(1, 2)]
    join 对两个 RDD 进行内连接 rdd.join(other) [(3, (4, 9)), (3, (6, 9))]
    rightOuterJoin 对两个 RDD 进行右连接 rdd.rightOuterJoin(other) [(3, (4, 9)), (3, (6, 9))]
    leftOuterJoin 对两个 RDD 进行左连接 rdd.leftOuterJoin(other) [(1, (2, None)), (3, (4, 9)), (3, (6, 9))]
    rdd.cogroup 将两个 RDD 相同的键分组到一起 rdd.cogroup(other) [1,([2],[]),(3,([4,6],[9]))]

    行动操作

    例如:rdd = [(1,2),(3,4),(3,6)]

    函数名 目的 实例 结果
    countByKey 对每个键进对应的元素分别计数 rdd.countByKey() [(1, 1), (3, 2)]
    collectAsMap 将结果以映射表的形式返回, rdd.collectAsMap() {1:2,3:6}
    lookup 返回给定键的所有值 rdd.lookup(3) [4,6]

    其他操作

    键值对 RDD 也是 RDD 支持 RDD 所有算子 键值对 RDD 都支持
    举例
    Pyhton

    >>> rdd = sc.parallelize([(1,2),(3,4),(3,6)])
    # 筛选第二个元素大于3的元素
    >>> res_rdd = rdd.filter(lambda kv: kv[1] > 3)
    >>> res_rdd.collect()
    [(3, 4), (3, 6)]
    

    上述图标只是简单说明 键值对 RDD 的一些操作 下面通过一些实例来进一步说明其中一些操作。有些操作比较简单易懂,所以举例不会涵盖所有操作。

    求平均值

    给定一下数据求平均值,会设计 mapValues 和 reduceByKey

    key value
    a 1
    b 0
    b 3
    c 2
    d 6
    d 9
    d 7

    通过 mapValues 转换为如下结果

    key value
    a (1,1)
    b (0,1)
    b (3,1)
    c (2,1)
    d (6,1)
    d (9,1)
    d (7,1)

    value 为元组 元组第一个值位 key 对应的值 元组第二个值位 次数。要求平均值,需要就出所有值的和,和次数之和
    通过 reduceByKey 求出 值之和,和次数之和
    如下

    key value
    a (1,1)
    b (3,2)
    c (2,1)
    d (22,3)

    有了次数之和,和 value 之和 求平均值 value 之和除以次数

    完整代码如下
    Python

    >>> rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
    >>> map_rdd = rdd.mapValues(lambda x : (x,1))
    >>> result = map_add.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])
    >>> result.collect()
    [('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
    # 计算平均值
    >>> result = result.mapValues(lambda x : x[0] / x[1])
    [('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]
    

    combineByKey 算子说明

    combineByKey 算子的参数较多 在表格中没有举例用法
    combineByKey 有三个参数
    createCombiner: 初始化方法,如果是新元素 combineByKey 都会调用改方法来初始化,注意:这一过程会在每个分区中第一次出现各个键时发生。而不是在整个 RDD 中第一次出现一个键时发生
    mergeValue: 分区内合并方法
    mergeCombiners:分区间合并方法
    利用 combineByKey 求平均值
    Python

     rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
    >>> res_rdd = rdd.combineByKey((lambda x:(x,1)),(lambda x,y:(x[0]+y,x[1]+1)),(lambda x,y:(x[0]+y[0],x[1]+y[1])))
    >>> res_rdd.collect()
    [('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
    # 计算平均值
    >>> res_rdd = res_rdd.mapValues(lambda x : x[0] / x[1])
    [('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]
    

    combineByKey 处理数据流程图
    原数据为:

    key value
    a 1
    b 0
    b 3
    c 2
    d 6
    d 9
    d 7

    假设分为3个区
    分区一:

    a 1
    b 0
    b 3

    分区二:

    c 2
    d 6
    d 9

    分区三:

    d 7

    分区中合并间 后:
    这里调用 (lambda x,y:(x[0]+y,x[1]+1)) 值相加 次数+1
    如果遇到新元素会调用 (lambda x:(x,1)), 转换为 值-次数的 kv结构
    分区一:

    a (1,1)
    b (3,2)

    分区二:

    c (2,1)
    d (15,2)

    分区三:

    d (7,1)

    分区见合并
    调用方法: (lambda x,y:(x[0]+y[0],x[1]+y[1]))
    所有值相加,次数相加

    a (1,1)
    b (3,2)
    c (2,1)
    d (22,3)

    sortByKey 算子

    sortByKey 两个参数
    ascending: 是否结果为升序 默认True。True 为升序 False 为降序
    keyfunc: 自定义排序规则
    例:
    Python

    sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x)):
    

    相关文章

      网友评论

        本文标题:Spark08 RDD KV数据

        本文链接:https://www.haomeiwen.com/subject/qvlzahtx.html