美文网首页
spark入门-本地wordcount-java版

spark入门-本地wordcount-java版

作者: 梦的飞翔_862e | 来源:发表于2019-03-13 10:45 被阅读0次
    本地开发环境说明

    java:1.8
    开发工具:Intelli IDEA
    构建工具:maven 3.5.2

    步骤一

    新建maven项目



    填写groupId,和artifactId,一直next知道finish
    步骤二:配置pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>cn.spark</groupId>
      <artifactId>spark-study-java</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <name>spark-study-java</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spark.version>2.4.0</spark.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.12</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.12</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.12</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.12</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.7.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
          <version>${spark.version}</version>
          <!--<scope>provided</scope>-->
        </dependency>
    
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.6</version>
        </dependency>
        <dependency>
          <groupId>com.thoughtworks.paranamer</groupId>
          <artifactId>paranamer</artifactId>
          <version>2.8</version>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test</testSourceDirectory>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
          </plugins>
        </pluginManagement>
    <plugins>
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass></mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
    
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>1.6.0</version>
          <executions>
            <execution>
              <goals>
                <goal>exec</goal>
              </goals>
            </execution>
          </executions>
          <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>cn.spark.App</mainClass>
          </configuration>
        </plugin>
    
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <configuration>
            <source>1.8</source>
            <target>1.8</target>
          </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <version>2.4</version>
            <configuration>
                <downloadSources>true</downloadSources>
            </configuration>
        </plugin>
    </plugins>
      </build>
    </project>
    
    步骤三:编写程序
    package cn.spark.study.core;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.*;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    /**
     * @author jiangxl
     * @description 本地测试的worldcount程序
     * @date 2019-03-12 16:36
     **/
    public class WorldCountLocal {
        public static void main(String[] args) {
            /**第一步:创建SparkConf对象,设置spark应用的配置信息
             *使用setMaster可以设置spark应用程序要连接的spark集群的master节点的url,但是如果设置为local,则代表在本地运行
             **/
            SparkConf conf = new SparkConf().setAppName("WorldCountLocal").setMaster("local");
            /**
             * 第二步:创建JavaSparkContext对象,SparkContext 是spark所有功能的入口,不管语言是java,scala,python
             *主要作用包括:初始化spark应用程序所需的一些核心组件(调度器DAGScheduler,TaskScheduler),还回到spark master节点上进行注册等
             *不同语言编写的spark程序,sparkContext不同
             * scala:原生SparkContext
             * java:JavaSparakContext
             * 如果开发spark sql,使用SQLContext,HiveContext
             *  如果开发spark streaming程序,就是它独有的SparkContext
             */
            JavaSparkContext jsc = new JavaSparkContext(conf);
            /**
             * 第三步:针对输入源(hdfs,本地文件),创建一个初始的rdd
             * 输入源的数据被打散, 分配到rdd的每个partition中,从而形成一个初始的分布式数据集
             * 本地测试就是针对本地文件
             * SparkContext中,根据文件类型的输入源创建RDD的方法,叫做textFile()
             * java中,创建的普通RDD,都叫javaRDD
             * RDD中有元素的概念,如果是hdfs或者本地文件,每一个元素相当于文件中的一行
             */
            JavaRDD<String> lines = jsc.textFile("D://spark//java//study1.txt");
            /**
             * 第四步:对初始RDD进行tranformation操作(计算操作)
             * 现将每一行拆分成单个单词
             * 通常操作会创建function配合rdd的map,flatmap算子来执行
             * function如果简单可以使用匿名函数,如果复杂,就使用单独类继承
             * flatMap将RDD的一个元素,拆分成一个或多个元素
             */
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" ")).iterator();
                }
            });
    
            /**
             * 接着将每个单词映射为(word,1),然后将word作为可以,计算出现次数
             * mapToPair将每个元素映射为一个tuple2类型的元素
             * T代表输入类型
             * K,V:tuple2的类型
             */
            JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2(word, 1);
                }
            });
            /**
             * 需要以单词作为key,统计单词的出现次数,使用reduceByKey算子,对每个key和value,都进行reduce操作
             * reduce 操作是将第一个值与第二值进行计算,然后再将结果与第三个值进行计算
             */
            JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
            /**
             * flatMap,mapToPair,reduceByKey都叫transformation操作,
             * 之后需要一个action操作来出发程序的执行,例如foreach
             */
            wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                @Override
        public void call(Tuple2<String, Integer> wordCount) throws Exception {
            System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times");
        }
    });
            }
    }
    
    注意事项:

    如果使用java1.8,则paranamer jar的版本必须是2.8以上,否则在jsc.textFile(...)会报数组越界

    相关文章

      网友评论

          本文标题:spark入门-本地wordcount-java版

          本文链接:https://www.haomeiwen.com/subject/qhczpqtx.html