美文网首页
【2019-06-23】键值对操作

【2019-06-23】键值对操作

作者: BigBigFlower | 来源:发表于2019-07-31 20:54 被阅读0次

    键值对RDD是spark中许多操作所需要的常见数据类型。键值对RDD通常用来进行聚合计算,一般要先通过一些初始的ETL(抽取、转化、装载)操作来将数据转化为键值对形式。
    spark为包含键值对类型的RDD提供了一些专有的操作,这些RDD被称为pairRDD。
    创建pairRDD

    #在python中使用第一个单词作为键创建出一个pairRDD
    lines = sc.parallelize(["hello world","hi"])
    pairs=lines.map(lambda x:(x.split(" ")[0],x))
    
    #在Scala中使用第一个单词作为键创建出一个pairRDD
    val lines=sc.parallelize(List("pandas","i like pands"))
    val pair=lines.map(x=>(x.split(" ")(0),x))
    

    一个pairRDD的转化操作
    reduceByKey(func)合并具有相同键的值。rdd.reduceByKey(x,y)=>x+y
    groupByKey()对具有相同键的值进行分组。rdd.groupByKey()
    combineByKey(createCombiner,meregeValue,mergeCombineers,partitioner)使用不同的返回类型合并具有相同相同键的值。
    mapValue(func)对pairRDD中的每个值应用一个函数而不改变键。rdd.mapValues(x=x+1)
    flatMapvalues(func) 对pairRdd中的每个值应用一个函数而不改变键。 rdd.mapValues(x=>x+1)
    keys. 返回一个仅包含键的RDD。rdd.keys
    values. 返回一个仅包含值的RDD。rdd.values
    sortByKey() 返回一个根据键排序的RDD。rdd.sortBykey()

    针对两个pairRDD的转化操作
    subtractByKey 删掉RDD中键与otherRDD中的键相同的元素 rdd.subtractByKey(other)
    join 对两个RDD内连接。 rdd .join(other)
    rightOuterJoin. 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)。 rdd.rightOuterJoin(other)
    leftOuterJoin. 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接)。 rdd.leftOuterJoin(other)
    cogroup将两个RDD中拥有相同键的数据分组到一起。 rdd.cogroup()

    #使用python对第二个元素进行筛选
    rdd_1=sc.parallelize([1,2,3])
    rdd1_pairs=rdd_1.map(lambda x:(x*x,x))
    rdd1_pairs.take(3)
    //[(1, 1), (4, 2), (9, 3)]
    rdd_filter=rdd1_pairs.filter(lambda keyValue: keyValue[1]<3)
    rdd_filter.take(2)
    //[(1, 1), (4, 2)]
    
    //使用Scala对第二个元素进行筛选
    val rdd_1=sc.parallelize(List(1,2,3))
    val rdd1_pairs=rdd_1.map(x=>(x,x*x))
    rdd1_pairs.take(3)
    //Array[(Int, Int)] = Array((1,1), (2,4), (3,9))
    val rdd1_filter=rdd1_pairs.filter{case (key,value)=> value<3}
    rdd1_filter.take(3)
    //Array[(Int, Int)] = Array((1,1))
    

    聚合操作
    当数据集以键值对形式组织的时候,聚合具有相同键的元素进行一些统计是很常见的操作。

    # python 用reduceByKey()和mapValues()计算每个键对应的平均值
    #.rdd1_pairs.    [(1, 1), (4, 2), (9, 3)]
    
    rdd_reduce=rdd1_pairs.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
    rdd_reduce.take(3)
    //[(4, (2, 1)), (9, (3, 1)), (1, (1, 1))]
    
    
    //scala用reduceByKey()和mapValues()计算每个键对应的平均值
    //rdd1_pairs   Array((1,1), (2,4), (3,9))
    val rdd1_reduce=rdd1_pairs.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
    rdd1_reduce.take(3)
    // Array[(Int, (Int, Int))] = Array((2,(4,1)), (1,(1,1)), (3,(9,1)))  
    
    
    #用python实现单词计数
    rdd=sc.textFile("/user/test/read.txt")
    words=rdd.flatMap(lambda x:x.split(" "))
    result=words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
    result.take(10)
    #[(u'', 2), (u'feed', 1), (u'all', 1), (u'love', 1), (u'sky', 1), (u'comfortable,', 1), (u'liked', 1), (u'down', 1), (u'view,', 1)]
    
    
    //scala 实现单词计数
    val input=sc.textFile("/user/test/read.txt")
    val words=input.flatMap(x => x.split(" "))
    val result=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    result.take(2)
    // Array[(String, Int)] = Array((She,1), (comfortable,,1))     
    

    数据分区
    spark程序可以通过控制RDD分区方式来减少通信开销,当数据集多次在类似于连接操作这种基于键的操作中使用时,分区才有帮助。
    获取RDD的分区方式

    //获取RDD分区方式
    //创建一个(Int,Int)对组成的RDD
    import org.apache.spark
    val pairs=sc.parallelize(List((1,1),(2,2),(3,3)))
    pairs.partitioner
    val partitioned=pairs.partitionBy(new spark.HashPartitioner(2))
    partitioned.partitioner
    

    PageRank

    //scala PangRank
    //假设相邻页面列表以Spark ObjectFile的形式存储
    val links=sc.objectFiles[(String,Seq[String])]("links")
                                        .partitionBy(new HashPartitioner(100))
                                         .persist()
    //每个页面的排序值初始化为1.0;由于使用mapValues,生成的RDD的分区方式会和"links"一样
    var ranks=links.mapValues(v=>1.0)
    
    //运行10轮的PanhRank迭代
    for(i<-0 until 10){
    val contributions=links.join(ranks).faltMap{
    case (pageId,(links,rank))=>
    links.map(dest=>(dest,rank/link.size))
    }
    ranks=contributions.reduceByKey((x,y)=>x+y).mapValues(v=>0.15+0.85*v)
    }
    //最终排名
    ranks.saveAsTextFile("ranks")
    

    自定义分区方式
    实现自定义的分区,需要继承org.apache.spark.Partitioner类,并实现下面三个方法:
    (1)numPartitions:Int:返回创建出来的分区数
    (2)getPartition(key,Any):Int返回给定键的分区编号
    (3)equals():Java判断相等性的标准方法
    基于域名的分区器,这个分区器只对URL中的域名部分求哈希

    //scala 自定义分区方式
    class DomainNamePartitioner(numParts:Int) extends Partitioner{
    override def numPartitions:Int=numParts
    override def getPartition(key,Any):Int={
    val domain=new Java.net.URL(key,toString).getHost()
    val code=(domain.hashCode%numPartitions)
    if(code<0):
    code+numPartitions
    }else{code}
    }
    override def equals(other:Any):Boolean=other match{
    case dnp:DomainNamePartitioner=>dnp.numPartitions==numPartitions
    case_=> false
    }
    }
    
    # python自定义分区方式
    #pathon 不需要扩展Partitioner 类,而是把一个特定的哈希函数作为额外的参数传给RDD.partitionBy()函数
    import urlparse
    def hash_domain(url):
      return hash(urlparse.urlparse(url).netloc)
    rdd.partitionBy(20,hash_domain) #创建20个分区
    

    相关文章

      网友评论

          本文标题:【2019-06-23】键值对操作

          本文链接:https://www.haomeiwen.com/subject/ghaiqctx.html