美文网首页
Spark基本工作原理与RDD

Spark基本工作原理与RDD

作者: ibunny | 来源:发表于2017-03-27 15:48 被阅读369次

    1.11 Spark架构原理

    Spark架构原理.png

    driver向worker进程提交资源请求,worker会启动多个executor进程为driver分配资源,executor启动后会向driver进行反注册,以便driver知道自己启动的资源的情况。
    driver向executor提交task(map/reduce等),executor启动多个task线程来执行转换/动作算子。

    1.12 创建RDD

    Spark Core提供了三种创建RDD的方式:

    • 使用程序中的集合创建RDD
    • 使用本地文件创建RDD
    • 使用HDFS文件创建RDD
    1. 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。
    1. 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。
    2. 使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。

    并行化集合创建RDD

    如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。

    package cn.spark.study.core;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    
    /**
     * 并行化集合创建RDD
     * 案例:累加1到10
     * @author Administrator
     *
     */
    public class ParallelizeCollection {
        
        public static void main(String[] args) {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("ParallelizeCollection")
                    .setMaster("local");  
            
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 要通过并行化集合的方式创建RDD,那么就调用SparkContext以及其子类,的parallelize()方法
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
            JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
            
            // 执行reduce算子操作
            // 相当于,先进行1 + 2 = 3;然后再用3 + 3 = 6;然后再用6 + 4 = 10。。。以此类推
            int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer num1, Integer num2) throws Exception {
                    return num1 + num2;
                }
                
            });
            
            // 输出累加的和
            System.out.println("1到10的累加和:" + sum);  
            
            // 关闭JavaSparkContext
            sc.close();
        }
    }
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    /**
     * @author Administrator
     */
    object ParallelizeCollection {
      
      def main(args: Array[String]) {
        val conf = new SparkConf()
            .setAppName("ParallelizeCollection")
            .setMaster("local")
        val sc = new SparkContext(conf)
        
        val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val numberRDD = sc.parallelize(numbers, 5)  
        val sum = numberRDD.reduce(_ + _)  
        println("1到10的累加和:" + sum)  
      }
    }
    

    调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如parallelize(arr, 10)

    使用本地文件和HDFS创建RDD

    Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。

    有几个事项是需要注意的:

    1. 如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。
    2. Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。
    3. Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。

    Spark的textFile()除了可以针对上述几种普通的文件创建RDD之外,还有一些特例的方法来创建RDD:

    1. SparkContext.wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回<filename, fileContent>组成的pair,作为一个PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每个元素就是文件中的一行文本。
    2. SparkContext.sequenceFile[K, V]()方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFilekeyvalue的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritableText等。
    3. SparkContext.hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConfInputFormatClass、Key和Value的Class。
    4. SparkContext.objectFile()方法,可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    
    /**
     * 使用本地文件创建RDD
     * 案例:统计文本文件字数
     * @author Administrator
     *
     */
    public class LocalFile {
        
        public static void main(String[] args) {
            // 创建SparkConf
            SparkConf conf = new SparkConf()
                    .setAppName("LocalFile")
                    .setMaster("local"); 
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 使用SparkContext以及其子类的textFile()方法,针对本地文件创建RDD
            JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
    
            // 统计文本文件内的字数
            JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(String v1) throws Exception {
                    return v1.length();  // 每一行的字数
                }
                
            });
            
            int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
                
            });
            
            System.out.println("文件总字数是:" + count);  
            
            // 关闭JavaSparkContext
            sc.close();
        }
    }
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    
    /**
     * 使用HDFS文件创建RDD,打包为jar包后spark-submit
     * 案例:统计文本文件字数
     * @author Administrator
     *
     */
    public class HDFSFile {
        
        public static void main(String[] args) {
            // 创建SparkConf
            // 修改:去除setMaster()设置,修改setAppName()
            SparkConf conf = new SparkConf()
                    .setAppName("HDFSFile"); 
            // 创建JavaSparkContext
            JavaSparkContext sc = new JavaSparkContext(conf);
            
            // 使用SparkContext以及其子类的textFile()方法,针对HDFS文件创建RDD
            // 只要把textFile()内的路径修改为hdfs文件路径即可
            JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt");
            
            // 统计文本文件内的字数
            JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
                
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(String v1) throws Exception {
                    return v1.length();
                }
                
            });
            
            int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
                
            });
            
            System.out.println("文件总字数是:" + count);  
            
            // 关闭JavaSparkContext
            sc.close();
        }
    }
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    /**
     * @author Administrator
     */
    object LocalFile {
      
      def main(args: Array[String]) {
        val conf = new SparkConf()
            .setAppName("LocalFile") 
            .setMaster("local");  
        val sc = new SparkContext(conf)
        
        val lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt", 1);
        val count = lines.map { line => line.length() }.reduce(_ + _)  
        
        println("file's count is " + count)  
      }
    }
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    /**
     * @author Administrator
     */
    object HDFSFile {
      
      def main(args: Array[String]) {
        val conf = new SparkConf()
            .setAppName("HDFSFile") ;  
        val sc = new SparkContext(conf)
        
        val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1);
        val count = lines.map { line => line.length() }.reduce(_ + _)  
        
        println("file's count is " + count)  
      }
    }
    

    相关文章

      网友评论

          本文标题:Spark基本工作原理与RDD

          本文链接:https://www.haomeiwen.com/subject/rfyrottx.html