- 编写代码
// pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.learn.example</groupId>
<artifactId>spark-example</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
// 入口
public class SparkJob {
public static void main(String[] args) {
String appName = args[0];
SparkSession session = SparkSession.builder().appName(appName).getOrCreate();
try {
String fullClassName = "org.learn.example.jobs." + appName;
Class clazz = Class.forName(fullClassName);
Method method = clazz.getDeclaredMethod("run", SparkSession.class);
method.invoke(clazz.newInstance(), session);
} catch (Exception e) {
e.printStackTrace();
} finally {
session.close();
}
}
}
// 各统计
public class WordCount implements ISparkJob {
@Override
public void run(SparkSession session) {
JavaRDD<String> javaRDD = session.createDataset(Arrays.asList("aaa", "bbb", "aaa"), Encoders.STRING()).javaRDD();
javaRDD.flatMap((line) -> Arrays.asList(line.split(" ")).iterator())
.mapToPair((word) -> new Tuple2<>(word, 1))
.reduceByKey((count1, count2) -> count1 + count2)
.foreach(pair -> System.out.println(pair._1 + " - " + pair._2));
}
}
- 配置IDEA
pom.xml文件中,<scope>provided</scope>
表示打包的时候不打进jar包。
但是用IDEA调试的时候需要使用被标记为provided的jar包,因此需要配置IDEA。
Image 1.png- 点击debug进行调试
另外:将代码打包后可以提交到本地运行(使用如下脚本)
-- submit-local.cmd
spark-submit --master local[*] --class org.learn.example.SparkJob target/spark-example-1.0-SNAPSHOT.jar WordCount
网友评论