spark筑基篇-01-Eclipse开发Spark Hello

作者: hylexus | 来源:发表于2016-09-21 12:41 被阅读599次

    [TOC]

    1 前言

    Spark这么火,越来越多的小伙伴开始搞大数据。
    通过多方查阅资料,这个单机版的Spark的HelloWorld终于跑出来了。
    此HelloWorld非彼HelloWorld,并不是打印出HelloWorld那么简单,而是一个单词统计程序,就是统计出一个文件中单词出现的次数并排序。

    会通过原生的scala的方式,传统的java方式和java8的方式分别实现同一功能。

    其实单机版和运行于集群之上的Spark程序,差别就在于运行环境,开发流程是一样的。
    以后的文章会记录如何建立集群。

    另外,该系列文章会在本人闲暇时同时在 CSDN简书 更新。

    欢迎各位道友纠错。

    2 环境搭建

    本人所使用环境如下:

    C:\Users\hylexus>java -version
    java version "1.8.0_91"
    Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
    Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
    
    C:\Users\hylexus>scala -version
    Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
    

    Eclipse for scala:

    Scala IDE build of Eclipse SDK
    Build id: 4.4.1-vfinal-2016-05-04T11:16:00Z-Typesafe
    

    此处scala版和java版都将使用maven来管理依赖,如何使用maven创建scala工程,请看本人另一文章 http://blog.csdn.net/hylexus/article/details/52602774

    注意:使用的spark-core_2.11依赖的jar文件多的吓人,耐心等待下载jar吧…………_

    2.1 scala版

    pom.xml部分内容如下:

    <properties>
        <maven.compiler.source>1.6</maven.compiler.source>
        <maven.compiler.target>1.6</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.5</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>
    
    
        <!-- Test -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.specs2</groupId>
            <artifactId>specs2-junit_2.11</artifactId>
            <version>2.4.16</version>
        </dependency>
    </dependencies>
    
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <!-- see http://davidb.github.com/scala-maven-plugin -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!-- <arg>-make:transitive</arg> -->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <compilerArguments>
                        <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->
                    </compilerArguments>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <!-- If you have classpath issue like NoDefClassError,... -->
                    <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
        </plugins>
    </build>
    

    2.2 java版

    pom.xml文件内容如下:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <compilerArguments>
                        <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->
                    </compilerArguments>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <!-- If you have classpath issue like NoDefClassError,... -->
                    <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
        </plugins>
    </build>
    

    3 代码

    3.1 scala-低调版

    object WordCount {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setAppName("WordCount") //
                .setMaster("local")
    
            val sc = new SparkContext(conf)
    
            val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"
            //获取文件内容
            val lines = sc.textFile(fileName, 1)
            //分割单词,此处仅以空格分割作为示例
            val words = lines.flatMap(line => line.split(" "))
            //String===>(word,count),count==1
            val pairs = words.map(word => (word, 1))
            //(word,1)==>(word,count)
            val result = pairs.reduceByKey((word, acc) => word + acc)
            //sort by count DESC
            val sorted=result.sortBy(e => { e._2 }, false, 1)
    
            val mapped=sorted.map(e => (e._2, e._1))
    
            mapped.foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })
    
            sc.stop()
        }
    }
    

    3.2 scala-高调版

    object WordCount {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf
            conf.setAppName("rank test").setMaster("local")
    
            val sc = new SparkContext(conf)
    
            val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"
    
            sc.textFile(fileName, 1) //lines
                .flatMap(_.split(" ")) //all words
                .map(word => (word, 1)) //to pair
                .reduceByKey(_ + _) //count
                .map(e => (e._2, e._1)) //
                .sortByKey(false, 1) //
                .map(e => (e._2, e._1)) //
                .foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })
    
            sc.stop()
    
        }
    }
    

    3.3 java-传统版

    代码恶心的没法看啊……
    到处都是匿名内部类……
    还好有java8的lambda来拯救你

    import java.util.Arrays;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    public class WordCount {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
    
            conf.setAppName("WordCounter")//
                    .setMaster("local");
    
            String fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties";
    
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile(fileName, 1);
    
            JavaRDD<String> words = lines
                    .flatMap(new FlatMapFunction<String, String>() {
                        private static final long serialVersionUID = 1L;
    
                        // 以前的版本好像是Iterable而不是Iterator
                        @Override
                        public Iterator<String> call(String line) throws Exception {
                            return Arrays.asList(line.split(" ")).iterator();
                        }
                    });
    
            JavaPairRDD<String, Integer> pairs = words
                    .mapToPair(new PairFunction<String, String, Integer>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(String word)
                                throws Exception {
                            return new Tuple2<String, Integer>(word, 1);
                        }
                    });
    
            JavaPairRDD<String, Integer> result = pairs.reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Integer call(Integer e, Integer acc)
                                throws Exception {
                            return e + acc;
                        }
                    }, 1);
    
            result.map(
                    new Function<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(
                                Tuple2<String, Integer> v1) throws Exception {
                            return new Tuple2<>(v1._1, v1._2);
                        }
                    })//
                    .sortBy(new Function<Tuple2<String, Integer>, Integer>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Integer call(Tuple2<String, Integer> v1)
                                throws Exception {
                            return v1._2;
                        }
                    }, false, 1)//
                    .foreach(new VoidFunction<Tuple2<String, Integer>>() {
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public void call(Tuple2<String, Integer> e)
                                throws Exception {
                            System.out.println("【" + e._1 + "】出现了" + e._2 + "次");
                        }
                    });
            sc.close();
    
        }
    }
    
    

    3.4 java-lambda版

    用上java8的lambda之后,还是挺清爽的嘛_

    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    import scala.Tuple2;
    
    public class WordCountByJava8 {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
    
            conf.setAppName("WordCounter")//
                    .setMaster("local");
    
            String fileName = "src/main/java/hylexus/spark/test1/WordCountByJava8.java";
    
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaRDD<String> lines = sc.textFile(fileName, 1);
    
            lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                    .mapToPair(word -> new Tuple2<>(word, 1))
                    .reduceByKey((e, acc) -> e + acc, 1)
                    .map(e -> new Tuple2<>(e._1, e._2))
                    .sortBy(e -> e._2, false, 1)
                    .foreach(e -> {
                        System.out.println("【" + e._1 + "】出现了" + e._2 + "次");
                    });
            sc.close();
    
        }
    }
    
    

    4 运行效果

    //...............
    //...............
    【->】出现了6次
    【+】出现了5次
    【import】出现了5次
    【new】出现了4次
    【=】出现了4次
    //...............
    //...............
    

    相关文章

      网友评论

        本文标题:spark筑基篇-01-Eclipse开发Spark Hello

        本文链接:https://www.haomeiwen.com/subject/uzlaettx.html