美文网首页
Spark基础系列之七--综合案例

Spark基础系列之七--综合案例

作者: 微生活_小阿楠 | 来源:发表于2020-05-07 08:26 被阅读0次

传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门

一、案例1:求TOP的值

//方法一:
import org.apache.spark.{SparkConf, SparkContext}

object top_N{
  def main(args:Array[String]):Unit = {
    val input = "src/top/file1.txt"
    val input2 = "src/top/file2.txt"
    //setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
    //setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
    val conf = new SparkConf().setAppName("top_N").setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(input)
    val textFile2 = sc.textFile(input2)

    val rdds = Seq(textFile,textFile2)
//repartition(1)可以把什么两个分区给合并成1个
    val bigRdd = sc.union(rdds).repartition(1)

    val arr = Array()
    val bigRdd1 = bigRdd.map(line=>line.split(",")(2)).map(line=>(line.toInt,"")).sortByKey(flase).map(x=>x._1).take(5).foreach(println)
  }
}

//方法二
object TopN{
  def main(args:Array[String]):Unit = {
    //屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
    //setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
    val conf = new SparkConf().setAppName("TopN").setMaster("local")
    val sc = new SparkContext(conf)

    val lines = sc.textFile("/usr/local/spark/mycode/file1.txt")
    //排序
    val result = lines.map(_.split(",")(2)).map(x => (x.toInt,"")).sortByKey(false)
    result.map(x=>x._1).take(5).foreach(x=>{println(x)})
  }
}



二、案例2:求最大最小值

//方法一:
import org.apache.spark.{SparkConf, SparkContext}

object num{
  def main(args:Array[String]):Unit = {
    val input = "src/num/file1.txt"
    val input2 = "src/num/file2.txt"
    //setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
    //setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
    val conf = new SparkConf().setAppName("num").setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(input)
    val textFile2 = sc.textFile(input2)

    val rdds = Seq(textFile,textFile2)
    val bigRdd = sc.union(rdds)

    val bigRdd1 = bigRdd.map(line => line.toInt)
    println("最大值: "+bigRdd1.max)
    println("最小值: "+bigRdd1.min)
  }
}

//方法二
import org.apache.log4j.{Level,Logger}
import org.apache.spark.{SparkConf, SparkContext}
object MaxAndMin{
  def main(args:Array[String]):Unit = {
      //屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
    //setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
    val conf = new SparkConf().setAppName("MaxAndMin").setMaster("local")
    val sc = new SparkContext(conf)

    val lines = sc.textFile("/usr/local/spark/mycode/file1.txt")
    val result = lines.map(line=>("",line.toInt))
    val max = result.reduceByKey((a,b)=>if(a>b) a else b).values.foreach(println)
    val min = result.reduceByKey((a,b)=>if(a<b) a else b).values.foreach(println)
  }
}


三、案例3:文件排序

import org.apache.spark.{SparkConf, SparkContext}
object paixu{
  def main(args:Array[String]):Unit = {
    val input = "src/paixu/file1.txt"
    val input2 = "src/paixu/file2.txt"
    val input3 = "src/paixu/file3.txt"
    //setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
    //setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
    val conf = new SparkConf().setAppName("num").setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(input)
    val textFile2 = sc.textFile(input2)
    val textFile3 = sc.textFile(input3)

    val rdds = Seq(textFile,textFile2,textFile3)
    val bigRdd = sc.union(rdds).repartition(1)

    var a = 0
    val bigRdd1 = bigRdd.map(line => (line.toInt,"")).sortByKey().map(x => {
      a += 1
      (a,x._1)
    }).foreach(println)


  }
}

相关文章

网友评论

      本文标题:Spark基础系列之七--综合案例

      本文链接:https://www.haomeiwen.com/subject/blccghtx.html