Spark中使用Java实现WordCount业务
创建一个Project
将/home/bigdata/apps/spark-2.1.0-bin-hadoop2.7/jars/
下的jar包拷贝到libs
目录下
配置路径
Project Sources
Dependencies
新建Class
MyJavaWordCount.java
本地模式
public class MyJavaWordCount {
public static void main(String[] args) {
//创建配置对象
SparkConf conf = new SparkConf().setAppName("MyJavaWordCount").setMaster("local");
//创建一个SparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);
//读取hdfs数据
JavaRDD<String> rdd1 = sc.textFile("hdfs://bigdata02:9000/wordcount.txt");
//分词
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
//hello hadoop
//hello spark
//hello flink
return Arrays.asList(input.split(" ")).iterator();
}
});
//单词计数
//hello -->(hello,1)
JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//累加 hello,3
JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});
//触发计算
List<Tuple2<String, Integer>> result = rdd4.collect();
//打印
for (Tuple2<String,Integer> r:result){
System.out.println(r._1+"\t"+r._2);
}
//释放资源
sc.stop();
}
}
导出Jar包在服务器上运行
MyJavaWordCount.java
生成jar包
public class MyJavaWordCount {
public static void main(String[] args) {
//创建配置对象,这里去掉setMaster
SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");
//创建一个SparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);
//读取hdfs数据,这里改一下
JavaRDD<String> rdd1 = sc.textFile(args[0]);
//分词
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
//hello hadoop
//hello spark
//hello flink
return Arrays.asList(input.split(" ")).iterator();
}
});
//单词计数
//hello -->(hello,1)
JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
//累加 hello,3
JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});
//触发计算
List<Tuple2<String, Integer>> result = rdd4.collect();
//打印
for (Tuple2<String,Integer> r:result){
System.out.println(r._1+"\t"+r._2);
}
//释放资源
sc.stop();
}
}
打包操作
Project Sources
Artifacts
Build Artifacts
导出成功
上传Jar包到服务器并执行
cd /home/bigdata/apps/spark-2.1.0-bin-hadoop2.7
./bin/spark-submit --master spark ://bigdata02:7077 --class ****.MyJavaWordCount /home/bigdata/data/MyJavaWordCo unt.jar hdfs://bigdata02:9000/wordcount.txt
网友评论