美文网首页
50任务调度

50任务调度

作者: 文茶君 | 来源:发表于2020-03-15 20:38 被阅读0次

1.补充算子

transformations

mapPartitionWithIndex

类似于mapPartitions,除此之外还会携带分区的索引值。

repartition

增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle)

coalesce

coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
true为产生shuffle,false不产生shuffle。默认是false。
如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)

groupByKey

作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。
zip
将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。

zipWithIndex

该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。

Action

countByKey

作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。

countByValue

根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

reduce

根据聚合逻辑聚合数据集中的每个元素。

3.Spark-Submit提交参数

Options:

--master
MASTER_URL, 可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
--deploy-mode
DEPLOY_MODE, Driver程序运行的地方,client或者cluster,默认是client。
--class
CLASS_NAME, 主类名称,含包名
--jars
逗号分隔的本地JARS, Driver和executor依赖的第三方jar包
--files
用逗号隔开的文件列表,会放置在每个executor工作目录中
--conf
spark的配置属性
--driver-memory
Driver程序使用内存大小(例如:1000M,5G),默认1024M
--executor-memory
每个executor内存大小(如:1000M,2G),默认1G

Spark standalone with cluster deploy mode only:

--driver-cores
Driver程序的使用core个数(默认为1),仅限于Spark standalone模式
Spark standalone or Mesos with cluster deploy mode only:
--supervise
失败后是否重启Driver,仅限于Spark alone或者Mesos模式
Spark standalone and Mesos only:
--total-executor-cores
executor使用的总核数,仅限于SparkStandalone、Spark on Mesos模式

Spark standalone and YARN only:

--executor-cores
每个executor使用的core数,Spark on Yarn默认为1,standalone默认为worker上所有可用的core。

YARN-only:

--driver-cores
driver使用的core,仅在cluster模式下,默认为1。
--queue
QUEUE_NAME 指定资源队列的名称,默认:default
--num-executors
一共启动的executor数量,默认是2个。

4.资源调度源码分析

资源请求简单图



资源调度Master路径:


路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala

提交应用程序,submit的路径:


路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala
总结:
1.Executor在集群中分散启动,有利于task计算的数据本地化。
2.默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
3.如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
4.默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。
结论演示
使用Spark-submit提交任务演示。也可以使用spark-shell
默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。
./spark-submit
--master spark://node1:7077
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
10000

2.在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。

./spark-submit
--master spark://node1:7077
--executor-cores 1
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
10000

3.内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。

./spark-submit
--master spark://node1:7077
--executor-cores 1
--executor-memory 3g
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
10000

4.--total-executor-cores集群中共使用多少cores
注意:一个进程不能让集群多个节点共同启动。

./spark-submit
--master spark://node1:7077
--executor-cores 1
--executor-memory 2g
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
10000

5.任务调度源码分析

Action算子开始分析
任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。
划分stage,以taskSet形式提交任务DAGScheduler 类中getMessingParentStages()方法是切割job划分stage。可以结合以下这张图来分析:


6.二次排序


SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("SecondarySortTest");
final JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");

JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<SecondSortKey, String> call(String line) throws Exception {
           String[] splited = line.split(" ");
           int first = Integer.valueOf(splited[0]);
           int second = Integer.valueOf(splited[1]);
           SecondSortKey secondSortKey = new SecondSortKey(first,second);
           return new Tuple2<SecondSortKey, String>(secondSortKey,line);
    }
});

pairSecondRDD.sortByKey(false).foreach(new  
               VoidFunction<Tuple2<SecondSortKey,String>>() {
    
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
             System.out.println(tuple._2);
    }
});

public class SecondSortKey  implements Serializable,Comparable<SecondSortKey>{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private int first;
    private int second;
    public int getFirst() {
        return first;
    }
    public void setFirst(int first) {
        this.first = first;
    }
    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    public SecondSortKey(int first, int second) {
        super();
        this.first = first;
        this.second = second;
    }
    @Override
    public int compareTo(SecondSortKey o1) {
        if(getFirst() - o1.getFirst() ==0 ){
            return getSecond() - o1.getSecond();
        }else{
            return getFirst() - o1.getFirst();
        }
    }
}

7分组取topN和topN

SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("TopOps");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> linesRDD = sc.textFile("scores.txt");

JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {

/**
 * 
 */
private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(String str) throws Exception {
    String[] splited = str.split("\t");
    String clazzName = splited[0];
    Integer score = Integer.valueOf(splited[1]);
    return new Tuple2<String, Integer> (clazzName,score);
        }
});

pairRDD.groupByKey().foreach(new 
            VoidFunction<Tuple2<String,Iterable<Integer>>>() {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
    String clazzName = tuple._1;
    Iterator<Integer> iterator = tuple._2.iterator();
    
    Integer[] top3 = new Integer[3];
    
    while (iterator.hasNext()) {
         Integer score = iterator.next();

           for (int i = 0; i < top3.length; i++) {
         if(top3[i] == null){
                top3[i] = score;
                break;
          }else if(score > top3[i]){
                 for (int j = 2; j > i; j--) {
                top3[j] = top3[j-1];
                 }
                top3[i] = score;
                break;
         }
       }
 }
 System.out.println("class Name:"+clazzName);
 for(Integer sscore : top3){
      System.out.println(sscore);
  }
}
}); 

8.SparkShell的使用

概念:
SparkShell是Spark自带的一个快速原型开发工具,也可以说是Spark的scala REPL(Read-Eval-Print-Loop),即交互式shell。支持使用scala语言来进行Spark的交互式编程。
使用:
启动Standalone集群,./start-all.sh
在客户端上启动spark-shell:

./spark-shell --master spark://node1:7077

启动hdfs,创建目录spark/test,上传文件wc.txt

启动hdfs集群:
start-all.sh
创建目录:
hdfs dfs -mkdir -p /spark/test
上传wc.txt
hdfs dfs -put /root/test/wc.txt /spark/test/
运行wordcount
sc.textFile("hdfs://node1:9000/spark/test/wc.txt")
.flatMap(.split(" ")).map((,1)).reduceByKey(+).foreach(println)

相关文章

  • 50任务调度

    1.补充算子 transformations mapPartitionWithIndex 类似于mapPartit...

  • iOS调度组的使用

    结果 :2016-09-13 13:50:23.992 12-调度组[2598:1484594] 任务22016-...

  • linux定时任务

    一 、简介 Linux下的任务调度分为两类,系统任务调度和用户任务调度 系统任务调度:系统需要定期执行的任务,比如...

  • Linux 系统之crontab命令

    Linux下的任务调度分为两类,系统任务调度 和 用户任务调度。 系统任务调度:系统周期性所要执行的工作,比如写缓...

  • 任务调度

    http://www.cnblogs.com/langtianya/archive/2013/05/15/3079...

  • 任务调度

    介绍 在本方案中,使用celery作为任务分发平台。对于存入celery的大量任务,能达到以下的要求: 任务逻辑相...

  • 任务调度

    主要有3种方案:数据库扫表;小顶堆;时间轮。 数据库扫表 延迟比较大 小顶堆 首先维持一个小顶堆,即最快需要执行的...

  • 任务调度

    用了一个计算机类词汇作为标题。 在每天的工作生活中,会有各式各样的事情,有的重要,有的不重要,有的紧急,有的不太紧...

  • Linux任务调度

    crond任务调度 crontab用于定时任务的设置。任务调度:是指系统在某个时间执行的特定的命令或程序。任务调度...

  • crond任务调度(定时任务调度)

    基本语法crontab [选项]选项-e : 编辑crontab定时任务-l : 查询crontab定时任务...

网友评论

      本文标题:50任务调度

      本文链接:https://www.haomeiwen.com/subject/cccvjhtx.html