SparkWC.scala
package day06
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkWC {
def main(args: Array[String]): Unit = {
// 配置信息类
val conf: SparkConf = new SparkConf().setAppName("SparkWC")//.setMaster("local[*]")
// 上下文对象
val sc: SparkContext = new SparkContext(conf)
// 读取数据
val lines = sc.textFile(args(0))
// 处理数据
val words: RDD[String] = lines.flatMap(_.split(" "))
val paired: RDD[(String, Int)] = words.map((_,1))
val reduced: RDD[(String, Int)] = paired.reduceByKey(_+_)
val res: RDD[(String, Int)] = reduced.sortBy(_._2, false)
// 保存
res.saveAsTextFile(args(1))
// println(res.collect().toBuffer)
// 结束任务
sc.stop()
}
}
//打包上传
image.png
#cd data
#vi wc1.log
hello tom
hello jerry
hello tom
hello kitty
hello tom
hello jerry
vi wc2.log
hello tom
hello jerry
hello lilei
hello hanmeimei
hello tom
hello tom
hello jerry
hello tom
#vi wc3.log
hello tom
hello jerry
hello lilei
hello hanmeimei
hello tom
hello tom
hello jerry
hello tom
# hdfs dfs -mkdir /wc
# hdfs dfs -put ~/data/wc1.log /wc
# hdfs dfs -put ~/data/wc2.log /wc
# hdfs dfs -put ~/data/wc3.log /wc
#cd training/spark/bin
# ./spark-submit --class day06.SparkWC spark://192.168.56.21:7077 --executor-memory 1g --total-executor-cores 2 /root/wc.jar hdfs://192.168.56.21:9000/wc hdfs://192.168.56.21:9000/output
网友评论