美文网首页
spark 2.4 操作

spark 2.4 操作

作者: Helen_Cat | 来源:发表于2018-11-28 15:33 被阅读39次

    sc.textfile()
    一读取 本地文件系统目录要加 file:
    读取hdfs 的要加 hdfs:

    二 遇到

     Exception in thread "main" java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapred.FileInputFormat
    
    

    这个是因为 默认依赖的hadoop 版本有问题,默认是2.6.5
    但是这个版本太低了,需要改成2.7.2

    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.2"
    // https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common
    libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.7.2"
    libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.2"
    
    

    一些算子操作
    实例

    package com.zh.all
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object algorithm1{
            def main(args: Array[String]): Unit = {
    
                var conf=new SparkConf().setMaster("local[*]").setAppName("algorithm1").set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
                var sc =new SparkContext(conf)
    
    
    //            val arr=Seq(34,24,56,78,10,29)
    //            var arrRDD=sc.parallelize(arr)
    //            val sum =arrRDD.reduce(_+_)
    //            println(sum)
                val hashM=Map("red" -> "#FF0000",
                    "azure" -> "#F0FFFF",
                    "peru" -> "#CD853F",
                    "blue" -> "#0033FF",
                    "yellow" -> "#FFFF00",
                    "red" -> "0000",
                    "blue" -> "45",
                    "yellow" -> "#FFFF00",
                    "red" -> "#FF0000")
    
    //            val mapRdd=sc.parallelize(hashM)
                val zpath="file:/home/muller/Documents/scalalab/sparkAngorothm/src/main/resources/80input.txt"
    
                val rawRdd=sc.textFile("file:/home/muller/Documents/scalalab/sparkAngorothm/src/main/resources/80input.txt")
    
                val linerdd=  rawRdd.flatMap(line=>line.split(" "))
                var sumrDD=linerdd.map(line=>line.split("#").map(key=>key.toDouble).toSeq.reduce(_+_))
    //            sumrDD.map(arr=>arr.)
                sumrDD.foreach(line=>{
                    println(line)
                    println("*************")
                })
    //            linerdd.flatMap( line=>
    //                    {
    //                        val keys= line.split("#")
    //                        val sum =keys.map(key=>key.toDouble).reduce(_+_)
    //                        sum
    //                    }
    //
    //            )
    //            rawRdd.saveAsTextFile("./out.txt")
    
                var seqMap=Seq(  "red" -> "#FF0000",  "yellow" -> "#FFFF00","azure" -> "#F0FFFF",
                "peru" -> "#CD853F",
                "blue" -> "#0033FF",
                "yellow" -> "#FFFF00",
                "red" -> "0000",
                "blue" -> "45",
                "yellow" -> "#FFFF00",
                "red" -> "#FF0000")
    
    //            rawRdd.map{
    //                line=>
    //                    println("he")
    //                    println(rawRdd)
    //            }
                val  seqRdd=sc.parallelize(seqMap)
    
                seqRdd.foreach(line =>print(line._2))
                val newRdd=seqRdd.map(line=> (line._1,line._2)).reduceByKey(_+_)
    
                newRdd.foreach(line=>print(line._1+"   "+line._2+"\n"))
               // seqRdd.map(line=> print(line._2))
                //    (line._1,line._2)) //.reduceByKey(_.mkString(_)).map(lne=>print(lne))
                println("spark running")
    
                val data=List((1,3),(1,2),(1,4),(2,3))
                val rdd=sc.parallelize(data, 2)
    
                //合并不同partition中的值,a,b得数据类型为zeroValue的数据类型
                def combOp(a:String,b:String):String={
                    println("combOp: "+a+"\t"+b)
                    a+b
                }
                //合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
                def seqOp(a:String,b:Int):String={
                    println("SeqOp:"+a+"\t"+b)
                    a+b
                }
                rdd.foreach(println)
                //zeroValue:中立值,定义返回value的类型,并参与运算
                //seqOp:用来在同一个partition中合并值
                //combOp:用来在不同partiton中合并值
                val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)
    
    
                Thread.sleep(1)
                aggregateByKeyRDD.foreach(line=>print(line._1+"   "+line._2+"\n"))
    
            }
    
    
        }
    
    
    class algorithm1 {
    
    
    
    
    }
    
    

    相关文章

      网友评论

          本文标题:spark 2.4 操作

          本文链接:https://www.haomeiwen.com/subject/yztzqqtx.html