美文网首页
Scala常用函数

Scala常用函数

作者: leobupt | 来源:发表于2018-01-30 18:33 被阅读0次

    1. 常用函数

    • takeWhile
    # takeWhile是从第一个元素开始,取满足条件的元素,直到不满足为止
    val s1 = List(1,2,3,4,10,20,30,40,5,6,7,8,50,60,70,80)
    val r1 = s1.takeWhile( _ < 10)
    r1: List[Int] = List(1, 2, 3, 4)
    
    • Iterator类型的drop函数
    val it = List.range(0, 10, 2).map {i => i.toString}
    it.drop(1).zip(it.dropRight(1))
    
    • List
    # List add Element
    it3  :+ (1000,2000)   # 向末尾加
    it3  :: (1000,2000)   # 向头部加
    
    • reduceByKey
    # reduce不按map顺序执行, 可以使用groupBy
    
    • cogroup | join | groupByKey 区别

    github:引用链接

    Join() returns an dataset of [key, leftValue, rightValue], where [key, leftValue] comes from one dataset, and [key, rightValue] from the other dataset.
    
    CoGroup() returns an dataset of [key, leftValues, rightValues], where [key, leftValue] entries from one dataset are group together into [key, leftValues], and [key, rightValue] from the other dataset are grouped into [key, rightValues], and both grouped entries are combined into [key, leftValues, rightValues].
    
    GroupByKey() returns an dataset of [key, values], where [key, value] entries from one dataset are group together.
    
    Join(), GroupByKey() and CoGroup() all depend on Partition(). Both of the input datasets should be partitioned by the same key, and partitioned to the same number of shards. Otherwise, a relatively costly partitioning will be performed.
    

    join过程包含cogroup和flatmap两个过程, 如下图:
    引自: join(otherRDD, numPartitions)

    join计算流程
    #After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. 
    #In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable 
    #(e.g. if the variable is shipped to a new node later).
    #即: 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。
    # 广播变量可被用于有效地给每个节点一个大输入数据集的副本。
    # Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
    
     scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
     broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int} = Broadcast(0)
    
     scala> broadcastVar.value
     res0: Array[Int] = Array(1, 2, 3)
    
    • foldLeft
    左侧累计
    0为初始值(记住numbers是List[Int]类型),m作为一个累加器。
    
    直接观察运行过程:
    scala> numbers.foldLeft(0) { (m: Int, n: Int) => println("m: " + m + " n: " + n); m + n }
    m: 0 n: 1
    m: 1 n: 2
    m: 3 n: 3
    m: 6 n: 4
    m: 10 n: 5
    m: 15 n: 6
    m: 21 n: 7
    m: 28 n: 8
    m: 36 n: 9
    m: 45 n: 10
    res0: Int = 55
    
    • Option
    scala> val myMap: Map[String, (String, Boolean)] = Map("key1" -> ("value", true))
    myMap: Map[String,(String, Boolean)] = Map(key1 -> (value,true))
    scala> val vs = myMap.get("key1")
    vs: Option[(String, Boolean)] = Some((value,true))
    
    # 以上是元组方式,取出元组中数据,方式如下
    
    # 方法一:
    val (v2, s2) = vs match {
        case Some((v,s)) => (v, s)
        case _        => ("null", "null")
    }
    
    #方法二:
    #如果被map的元素个数是0,就不执行map,但是可以执行map之后的函数,如下:
    val (v2, s2) = vs.map { case (s, b) => (s, b.toString)}.getOrElse((null, null))
    # val (v2, s2) = vs.map { case (s, b) => (s, b.toString)}.getOrElse(("null", "null"))
    
    #注意:方法二,null不是string,后面s2不能调用关于String的方法, 关于null的类型转化,以下例子帮助理解
    # null不能调用toString, 但None是可以的
    
    scala> null.toString
    java.lang.NullPointerException
    scala> None.toString
    res42: String = None
    
    # null的类型,及其使用:
    scala> "null"
    res38: String = null
    scala> null
    res39: Null = null
    scala> null.asInstanceOf[String]
    res40: String = null
    scala> Array("a",null).mkString(",")
    res41: String = a,null
    
    • Option[Boolean]
    scala> val myMap: Map[String, (String, Boolean)] = Map("key1" ->  true)
    myMap: Map[String,(String, Boolean)] = Map(key1 -> (value,true))
    scala> val myMap2 = myMap + ("k2" -> false)
    
    // 体会以下区别, 返回值
    scala> myMap2.get("k8").map(_.toString).getOrElse(null)
    res160: String = null
    
    scala> myMap2.get("k8").getOrElse(null)
    res161: Any = null
    
    
    • HashMap
    scala> val map1 = mutable.HashMap[String, String]()
    map1: scala.collection.mutable.HashMap[String,String] = Map()
    
    scala> map1.put("a1","aa1")
    res104: Option[String] = None
    
    scala> map1
    res105: scala.collection.mutable.HashMap[String,String] = Map(a1 -> aa1)
    
    scala> map1("a2") = "aa2"
    
    scala> map1
    res108: scala.collection.mutable.HashMap[String,String] = Map(a1 -> aa1, a2 -> aa2)
    
    • immutable.Map
    // myMap 是immutable, 即不可改变的Map, 不能对其增加元素
    scala> val myMap = Map("k1" -> true)
    myMap: scala.collection.immutable.Map[String,Boolean] = Map(k1 -> true)
    
    // 但可以把immutable与其他map相加, 返回新的值
    scala> val myMap2 = myMap + ("k2" -> false)
    myMap2: scala.collection.immutable.Map[String,Boolean] = Map(k1 -> true, k2 -> false)
    
    scala> myMap2.get("k8").isEmpty
    res147: Boolean = true
    
    
    // sortBy
    // 本地创建, 测试该函数
    scala> val data = List(3,1,90,3,5,12)
    data: List[Int] = List(3, 1, 90, 3, 5, 12)
     
    scala> val rdd = sc.parallelize(data)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
     
    scala> rdd.collect
    res0: Array[Int] = Array(3, 1, 90, 3, 5, 12)
     
    scala> rdd.sortBy(x => x).collect
    res1: Array[Int] = Array(1, 3, 3, 5, 12, 90)
     
    scala> rdd.sortBy(x => x, false).collect
    res3: Array[Int] = Array(90, 12, 5, 3, 3, 1)
     
    scala> val result = rdd.sortBy(x => x, false)
    result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at <console>:16
     
    // 默认的partitions = 6
    scala> result.partitions.size
    res9: Int = 6
     
    // 这里我们可以设置partitions的数量
    scala> val result = rdd.sortBy(x => x, false, 1)
    result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at <console>:16
     
    scala> result.partitions.size
    res10: Int = 1
    
    
    // sortByKey
    
    scala> val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"), 2)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:25
    
    scala> val b = sc. parallelize (1 to a.count.toInt , 2)
    b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[86] at parallelize at <console>:27
    
    scala> b.collect
    res60: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> val c = b.zip(a)
    c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[92] at zip at <console>:29
    
    scala> c.sortByKey().collect
    res61: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))
    
    // top取出按key倒序排列的的top N元素, 注意使用top不需要进行sortBy操作, 它自带操作
    scala> c.top(3)
    res63: Array[(Int, String)] = Array((5,test), (4,397090770), (3,com))
    
    // 默认是升序排列
    scala> c.sortByKey().collect
    res64: Array[(Int, String)] = Array((1,wyp), (2,iteblog), (3,com), (4,397090770), (5,test))
    
    scala> c.sortByKey(false).collect
    res66: Array[(Int, String)] = Array((5,test), (4,397090770), (3,com), (2,iteblog), (1,wyp))
    
    // top 注意, 如果为rdd, 且, 结构为(k,v), 那么使用top函数进行排序时, v中不能含有Array[Long], 但可以含有long
    scala> val rdd2 = sc.parallelize(List((10, ("a", Array(1,2))), (9, ("b", Array(3,5))), (1, ("c", Array(6,0)))))
    rdd2: org.apache.spark.rdd.RDD[(Int, (String, Array[Int]))] = ParallelCollectionRDD[134] at parallelize at <console>:26
    
    scala> rdd2.top(1)
    <console>:29: error: No implicit Ordering defined for (Int, (String, Array[Int])).
           rdd2.top(1)
    
    // 但可以含有long
    scala> val rdd2 = sc.parallelize(List((10, ("a", 11)), (9, ("b", 10)), (100, ("c", 20))))
    rdd2: org.apache.spark.rdd.RDD[(Int, (String, Int))] = ParallelCollectionRDD[138] at parallelize at <console>:26
    
    scala> rdd2.top(2)
    res209: Array[(Int, (String, Int))] = Array((100,(c,20)), (10,(a,11)))
    
    • 数组Array.grouped
    // 将数组, 分成N组:
    scala> val a = (1 to 9).toArray
    a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    scala> a.grouped(3).toArray
    res178: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))
    
    
    • zip函数
    // 原来:
    reduceByKey{case ((s1, c1), (s2, c2)) =>
                val n1 = s1.split("\t")(0).toLong + s2.split("\t")(0).toLong
                val n2 = s1.split("\t")(1).toLong + s2.split("\t")(1).toLong
                val n3 = s1.split("\t")(2).toLong + s2.split("\t")(2).toLong
                val n4 = s1.split("\t")(3).toLong + s2.split("\t")(3).toLong
                val n5 = s1.split("\t")(4).toLong + s2.split("\t")(4).toLong
                val statusTrueNumStr = Array(n1, n2, n3, n4, n5).mkString("\t")
                val count = c1 + c2
    // 使用zip后:
            val rddLastOneWeek2 = rddLastOneWeek.map{case (_, bigVersion, arrStatusTrueNum, isStable, count) =>
                ((bigVersion, isStable), (arrStatusTrueNum, count))
            }.reduceByKey{case ((arr1, count1), (arr2, count2)) =>
                val arr = arr1.zip(arr2).map{case (x,y) => x+y}
                val count = count1 + count2
    
    • zipWithIndex
    scala> l
    res21: List[Int] = List(1, 2, 3, 4)
    scala> l.zipWithIndex
    res22: List[(Int, Int)] = List((1,0), (2,1), (3,2), (4,3))
    

    相关文章

      网友评论

          本文标题:Scala常用函数

          本文链接:https://www.haomeiwen.com/subject/lxixpxtx.html