-
先启动master,之后启动worker
master会将worker的信息保存起来,worker会向master定期发送心跳。 -
启动SparkSubmit(driver)
(通过一个脚本启动:bini\spark-submit --master --executor --memory 2g --exector-cores 10 ------>指定启动需要每台worker的占用内存为2G,总共需要CPU核心数为10核)
eg: -
sparksubmit会向master申请资源
master会检查剩余的资源是否满足条件,如果满足条件,会向worker进行rpc通信,让worker启动exector并将分配的参数传递给worker。 -
exector启动之后,worker主动连接sparksubmit(drver)(通过master->worker->exector进而exector知道driver在哪里)
5.driver通过网络将任务分发给worker,exector执行任务。
image.png
在spark上提交第一个scala的wordcount程序
object ScalaWorldCount {
def main(args: Array[String]): Unit = {
//创建spark执行的入口
val conf = new SparkConf()
val sc =new SparkContext(conf)
//指定以后从哪里读取数据创建RDD(弹性分布式数据集)
// sc.textFile(args(0))
// .flatMap(_.split(" "))
// .map((_,1)).reduceByKey(_+_)
// .sortBy(_._2,false)
// .saveAsTextFile(args(1))
//指定从哪里读取数据创建RDD(弹性分布式数据集)
val lines: RDD[String] = sc.textFile(args(0))
//切片,压平
val words: RDD[String] = lines.flatMap(_.split(" "))
//将单词和1进行组合
val wordAndOne: RDD[(String, Int)] = words.map((_,1))
//按key进行聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//排序
val sorted: RDD[(String, Int)] = reduced.sortBy(_._2,false)
//将排序结果保存到磁盘
sorted.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
通过maven的package指令将程序打成jar包并上传到服务器上。
通过脚本将任务提交到spark上
spark-submit --master spark://spark00:7077 --class org.xrw.spark.ScalaWorldCount /myJar/original-spark-test-1.0-SNAPSHOT.jar hdfs://spark00:9000/input/word hdfs://spark00:9000/output
打开浏览器,查看程序运行状况
image.png
网友评论