开发环境:
- kafka 1.1.1
- spark 2.3
Java代码:
package cn.spark.streaming;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
/**
*
* Hot Word Count
*
*/
public class WindowWordCount {
public static void main(String[] args) throws Exception{
SparkConf conf = new SparkConf().setAppName("WindowWordCount");
// create context
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// kafka properties map
Map<String, String> KafkaParams = new HashMap<String, String>();
KafkaParams.put("bootstrap.servers", "hserver-1:9092,hserver-2:9092,hserver-3:9092");
KafkaParams.put("group.id", "WindowWordCount");
KafkaParams.put("auto.offset.reset", "smallest");
// kafka topic set
Set<String> topics = new HashSet<String>();
topics.add(args[0]);
// access data DStream
JavaPairInputDStream<String, String> SearchLogDStream =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
KafkaParams,
topics
);
// flatMap
JavaDStream<String> WordDSteram =
SearchLogDStream.flatMap(
new FlatMapFunction<Tuple2<String,String>, String>() {
private static final long serialVersionUID = 4034522628037914742L;
@Override
public Iterator<String> call(Tuple2<String, String> tuple) throws Exception {
return Arrays.asList(tuple._2.split(" ")).iterator();
}
});
// mapToPair
JavaPairDStream<String, Integer> WordPairDStream =
WordDSteram.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 2101884706537316002L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
// reduceByKeyAndWindow
JavaPairDStream<String, Integer> WindowWordDStream =
WordPairDStream.reduceByKeyAndWindow(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = -358144101893232390L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},
Durations.seconds(10),
Durations.seconds(60)
);
// sort
JavaPairDStream<String, Integer> ResultSortDStream =
WindowWordDStream.transformToPair(
new Function<JavaPairRDD<String,Integer>, JavaPairRDD<String,Integer>>() {
private static final long serialVersionUID = 1441798634812792342L;
@Override
public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> unSortRDD)
throws Exception {
JavaPairRDD<String, Integer> sortRDD = unSortRDD
.mapToPair(
new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = -3715362497048144520L;
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
return new Tuple2<Integer, String>(tuple._2, tuple._1);
}
})
.sortByKey(false)
.mapToPair(
new PairFunction<Tuple2<Integer,String>, String, Integer>() {
private static final long serialVersionUID = 7017380215451671038L;
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)
throws Exception {
return new Tuple2<String, Integer>(tuple._2, tuple._1);
}
});
return sortRDD;
}
});
// print result
ResultSortDStream.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
网友评论