传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门
一、案例1:求TOP的值

//方法一:
import org.apache.spark.{SparkConf, SparkContext}
object top_N{
def main(args:Array[String]):Unit = {
val input = "src/top/file1.txt"
val input2 = "src/top/file2.txt"
//setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
//setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
val conf = new SparkConf().setAppName("top_N").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile(input)
val textFile2 = sc.textFile(input2)
val rdds = Seq(textFile,textFile2)
//repartition(1)可以把什么两个分区给合并成1个
val bigRdd = sc.union(rdds).repartition(1)
val arr = Array()
val bigRdd1 = bigRdd.map(line=>line.split(",")(2)).map(line=>(line.toInt,"")).sortByKey(flase).map(x=>x._1).take(5).foreach(println)
}
}
//方法二
object TopN{
def main(args:Array[String]):Unit = {
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
//setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
val conf = new SparkConf().setAppName("TopN").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("/usr/local/spark/mycode/file1.txt")
//排序
val result = lines.map(_.split(",")(2)).map(x => (x.toInt,"")).sortByKey(false)
result.map(x=>x._1).take(5).foreach(x=>{println(x)})
}
}
二、案例2:求最大最小值

//方法一:
import org.apache.spark.{SparkConf, SparkContext}
object num{
def main(args:Array[String]):Unit = {
val input = "src/num/file1.txt"
val input2 = "src/num/file2.txt"
//setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
//setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
val conf = new SparkConf().setAppName("num").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile(input)
val textFile2 = sc.textFile(input2)
val rdds = Seq(textFile,textFile2)
val bigRdd = sc.union(rdds)
val bigRdd1 = bigRdd.map(line => line.toInt)
println("最大值: "+bigRdd1.max)
println("最小值: "+bigRdd1.min)
}
}
//方法二
import org.apache.log4j.{Level,Logger}
import org.apache.spark.{SparkConf, SparkContext}
object MaxAndMin{
def main(args:Array[String]):Unit = {
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
//setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
val conf = new SparkConf().setAppName("MaxAndMin").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("/usr/local/spark/mycode/file1.txt")
val result = lines.map(line=>("",line.toInt))
val max = result.reduceByKey((a,b)=>if(a>b) a else b).values.foreach(println)
val min = result.reduceByKey((a,b)=>if(a<b) a else b).values.foreach(println)
}
}
三、案例3:文件排序

import org.apache.spark.{SparkConf, SparkContext}
object paixu{
def main(args:Array[String]):Unit = {
val input = "src/paixu/file1.txt"
val input2 = "src/paixu/file2.txt"
val input3 = "src/paixu/file3.txt"
//setAppName("top_N"),可以在网页版中以top_N这个名称看到该程序的情况
//setMaster("local"),设置为本地local。如果是分布式,就改为ip地址
val conf = new SparkConf().setAppName("num").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile(input)
val textFile2 = sc.textFile(input2)
val textFile3 = sc.textFile(input3)
val rdds = Seq(textFile,textFile2,textFile3)
val bigRdd = sc.union(rdds).repartition(1)
var a = 0
val bigRdd1 = bigRdd.map(line => (line.toInt,"")).sortByKey().map(x => {
a += 1
(a,x._1)
}).foreach(println)
}
}
网友评论