6.spark core之键值对操作

作者: java大数据编程 | 来源:发表于2018-08-22 17:37 被阅读2次

  键值对RDD(pair RDD)是spark中许多操作所需要的常见数据类型,通常用来进行聚合计算。

创建Pair RDD

  spark有多种方式可以创建pair RDD。比如:很多存储键值对的数据格式在读取时直接返回pair RDD;通过map()算子将普通的RDD转为pair RDD。

scala

# 使用第一个单词作为键创建一个pair RDD
val pairs = lines.map(x => (x.split(" ")(0), x))

java

# 使用第一个单词作为键创建一个pair RDD
# jdk1.8后也支持lambda表达式方式
PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
  }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

python

# 使用第一个单词作为键创建一个pair RDD
pairs = lines.map(lambda x: (x.split(" ")[0], x))

  从一个内存中的数据集创建pair RDD时,scala和python只需要对这个二元组集合调用SparkContext的parallelize()方法即可;而java需要使用SparkContext.parallelizePairs()方法。

pair RDD转化操作

转化操作总览

针对单个Pair RDD的转化操作

函数名 作用 示例
reduceByKey(func) 合并具有相同键的值 rdd.reduceByKey((x, y) => x + y)
groupByKey() 对具有相同键的值进行分组 rdd.groupByKey()
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 使用不同的返回类型合并具有相同键的值 rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
mapValues(func) 对pair RDD中的每个值应用一个函数而不改变键 rdd.mapValues(x => x + 1)
flatMapValues(func) 对pair RDD中的每个值应用一个返回迭代器的函数,生成对应原键的键值对记录 rdd.flatMapValues(x => (x to 5))
keys() 返回一个仅包含键的RDD rdd.keys
values() 返回一个仅包含值得RDD rdd.values
sortByKey() 返回一个根据键排序的RDD rdd.sortByKey()

针对两个Pair RDD的转化操作

函数名 作用 示例
subtractByKey 删除RDD中键与other RDD中键相同的元素 rdd.subtractByKey(other)
join 对两个RDD进行内连接 rdd.join(other)
leftOuterJoin 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接) rdd.leftOuterJoin(other)
rightOuterJoin 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接) rdd.rightOuterJoin(other)
cogroup 将两个RDD中拥有相同键的数据分组在一起 rdd.cogroup(other)

聚合

  • 使用mapValues()和reduceByKey()计算每个键对应值的均值。

scala

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

python

rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  • 使用flatMap()、map()和reduceByKey()计算单词统计

scala

val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)

java

JavaRDD<String> input = sc.textFile("s3://...");
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
   public Iterable<String> call(String x) {
        return Arrays.asList(x.split(" "));
   }
});
JavaPairRDD<String, Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String x) {
    return new Tuple2(x, 1);
  }
}).reduceByKey(
    new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
)

python

rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
  • 使用combineByKey()返回与输入数据不同类型的返回值,求每个键对应的平均值
执行原理
    1.combineByKey()作用于rdd的每个分区。
    2.如果访问的元素在分区中第一次出现,就使用createCombiner()方法创建那个键对应累加器的初始值。
    3.如果访问的元素在当前分区已经出现过,就使用mergeValue()方法将该键的累加器对应的当前值和新值合并。
    4.如果有两个或多个分区都有对应同一个键的累加器时,就使用mergeCombiners()方法将各个分区的结果进行合并。

scala

val result = rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).map{case (key, value) => (key, value._1 / value._2.toFloat)}

java

public static class AvgCount implements Serializable {
    public int total_;
    public int num_;
    public AvgCount(int total, int num) {
        total_ = total;
        num_ = num;
    }
    public float avg() {
        return total_/(float)num_;
    }
}

Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
    public AvgCount call(Integer x) {
        return new AvgCount(x, 1);
    }
};

Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
        a.total_ += x;
        a.num_ += 1;
        return a;
    }
};

Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
        a.total_ += b.total_;
        a.num_ += b.num_;
        return a;
    }
};

AvgCount initial = new AvgCount(0, 0);
JavaPairRDD<String, AvgCount> avgCounts = input.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
    System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}

python

sumCount = input.combineByKey((lambda x: (x, 1)), (lambda x, y: (x[0] + y, x[1] + 1)), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

分组

  对于单个RDD数据进行分组时,使用groupByKey()。如果先使用groupByKey(),再使用reduce()或fold()时,可能使用一种根据键进行聚合的函数更高效。比如,rdd.reduceByKey(func)与rdd.groupByKey().mapValues(value => value.reduce(func))等价,但前者更高效,因为避免了为每个键存放值列表的步骤。
  对多个共享同一个键的RDD进行分组时,使用cogroup()。cogroup方法会得到结果RDD类型为[(K, (Iterable[V], Iterable[W]))]。

连接

  将一组有键的数据与另一组有键的数据连接使用是对键值对数据执行的常用操作。连接方式主要有:内连接、左外连接、右外连接。

val storeAddress = sc.parallelize(Seq((Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"), (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")))
val storeRating = sc.parallelize(Seq(Store("Ritual"), 4.9), (Store("Philz"), 4.8)))
# 内连接
storeAddress.join(storeRating)
#左外连接
storeAddress.leftOuterJoin(storeRating)
#右外连接
storeAddress.rightOuterJoin(storeRating)

排序

  将数据排序输出是很常见的场景。sortByKey()函数接收一个叫做ascending的参数,表示是否让结果升序排序(默认true)。有时,也可以提供自定义比较函数。比如,以字符串顺序对整数进行自定义排序。

scala

implicit val sortIntegersByString = new Ordering[Int] {
    override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()

java

class IntegerComparator implements Comparator<Integer> {
    public int compare(Integer a, Integer b) {
        return String.valueOf(a).compareTo(String.valueOf(b))
    }
}
rdd.sortByKey(new IntegerComparator());

python

rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x))

Pair RDD行动操作

  和转化操作一样,所有基础RDD支持的行动操作也都在pair RDD上可用。另外,Pair RDD提供了一些额外的行动操作。

函数 作用 示例
countByKey 对每个键对应的元素分别计数 rdd.countByKey()
collectAsMap 将结果以映射表的形式返回 rdd.collectAsMap()
lookup(key) 返回指定键对应的所有值 rdd.lookup(3)

相关文章

  • 6.spark core之键值对操作

      键值对RDD(pair RDD)是spark中许多操作所需要的常见数据类型,通常用来进行聚合计算。 创建Pai...

  • spark—键值对操作

    1.JavaPairRDD背景键值对 RDD 通常用来进行聚合计算。先通过一些初始 ETL(抽取、转 化、装载)操...

  • 从零开始学习Spark(四)键值对操作

    键值对操作 Hadoop中,键值对是最基本的操作对象。同样,Spark中,针对键值对类型的RDD有非常丰富的API...

  • asp.net core 系列 10 配置configurati

    一. ASP.NET Core 中的配置概述 ASP.NET Core 中的应用配置是基于键值对,由config...

  • 3RDD创建

    1键值对rdd的创建 2常用的RDD键值对转换操作 常用的键值对转换操作有reduceByKey(func),gr...

  • 字典

    字典定义: 键值对 字典操作

  • RDD操作—— 键值对RDD(Pair RDD)

    键值对概述 “键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到。Spark操作中经常会用到“键值...

  • SparkRDD的键值对操作

    pairRDD Spark 为包含键值对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为pair RDD...

  • spark PairRDD 键值对操作

    CombineByKey 基于键聚合 这是最基本的聚合操作, 很多封装的函数都是基于它, 但能用更方便的函数就不要...

  • 对于kvc和kvo的简单理解

    KVC(Key - value - coding)键值对代码,即以键值对的方式对属性进行操作。可以通过KVC机制去...

网友评论

    本文标题:6.spark core之键值对操作

    本文链接:https://www.haomeiwen.com/subject/tibyiftx.html