- 启动external shuffle service
每一个Worker运行的节点都得启动这个服务
./sbin/.start-shuffle-service.sh - 启动spark-shell,启用动态资源分配
添加如下配置
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.port=7337 \
spark-shell --master spark://spark-project-1:7077 --jars /opt/module/hive/lib/mysql-connector-java-5.1.34.jar --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.port=7337 --driver-memory 600m --executor-memory 600m --num-executors 1
- 过60s,发现打印日志,说executor被removed,executor进程也没了
- 然后动手写一个wordcount程序,最后提交job的时候,会动态申请一个新的executor,出来一个新的executor进程
sc.setLogLevel("INFO")
val lines = sc.textFile("hdfs://spark-project-1:9000/testdate/wordcount.txt")
val words = lines.flatMap(_.split(" "))
val paris = words.map((_,1))
val counts = paris.reduceByKey(_ + _)
counts.collect()
- 然后整个作业执行完毕,证明external shuffle service+动态资源分配,流程可以走通
- 再等60s,executor又被释放掉
网友评论