美文网首页
Spark之WordCount集群

Spark之WordCount集群

作者: 阿坤的博客 | 来源:发表于2018-07-06 10:27 被阅读47次

本文记录用Scala编写WordCount并提交到Spark集群运行。在搭建本集群之前必须先搭建好Spark集群,搭建Spark集群请参考:Spark on Yarn 环境搭建.

主要内容:

  • 1.创建工程
  • 2.主程序
  • 3.提交运行

相关文章:
1.Spark之PI本地
2.Spark之WordCount集群
3.SparkStreaming之读取Kafka数据
4.SparkStreaming之使用redis保存Kafka的Offset
5.SparkStreaming之优雅停止
6.SparkStreaming之写数据到Kafka
7.Spark计算《西虹市首富》短评词云

1.创建Maven工程并引入依赖

项目结构截图如下:


依赖文件如下:

<dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
  </dependency>
</dependencies>
<build>
  <plugins>
    <plugin>
      <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <version>2.15.2</version>
      <executions>
        <execution>
          <id>scala-compile-first</id>
          <goals>
            <goal>compile</goal>
          </goals>
          <configuration>
            <includes>
              <include>**/*.scala</include>
            </includes>
          </configuration>
        </execution>
        <execution>
          <id>scala-test-compile</id>
          <goals>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

包含spark的核心依赖,以及scala编译插件。

2.主程序

ScalaWordCount.scala如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object ScalaWordCount {
  val LOG = LoggerFactory.getLogger("ScalaWordCount")

  def main(args: Array[String]): Unit = {

    if (args.length < 2) {
      LOG.error("请输入正确的参数")
      System.exit(1)
    }

    //创建一个Config
    val conf = new SparkConf()
      .setAppName("ScalaWordCount")

    //创建SparkContext对象
    val sc = new SparkContext(conf)

    //WordCount
    sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .repartition(1)
      .sortBy(_._2, false)
      .saveAsTextFile(args(1))

    //停止SparkContext对象
    sc.stop()
  }
}

接收2个参数,第一个输入文件,第二个输出文件夹

执行命令:

./bin/spark-submit \
--class me.jinkun.scala.wc.ScalaWordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 1G \
--executor-cores 1 \
"/opt/soft-install/data/spark-yarn-1.0-SNAPSHOT.jar" \
"hdfs://hadoop1:9000/data/input/words.txt" "hdfs://hadoop1:9000/data/output/wc.txt"

输入参数1为:hdfs://hadoop1:9000/data/input/words.txt 是hdfs上的words.txt文件
输入参数2为:hdfs://hadoop1:9000/data/output/wc.txt 是hdfs上的wc.txt文件夹,运行前必须先确保此文件夹不存在,否则会报错

words.txt内容

hello me
hello you
hello her

找到IDEA右侧的Maven Projects将程序打成jar包并运行


运行package生成jar包
找到生成的jar并上传执行

在hdfs页面查看生成的文件


结果如下:

(hello,3)
(you,1)
(her,1)
(me,1)

相关文章

网友评论

      本文标题:Spark之WordCount集群

      本文链接:https://www.haomeiwen.com/subject/pczfhxtx.html