文章地址:http://www.haha174.top/article/details/251614
- 简介
Spark Streaming 提供了滑动窗口的操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次落在窗口里面的RDD 数据,会被集合起来,然后生成新的RDD 会作为windows DStream 的一个RDD ,例如对每三秒钟执行一次滑动窗口计算。所以每个滑动窗口的操作,都必须只当两个参数,窗口的长度,以及滑动间隔,而且这两个参数都必须是batch 间隔的整数倍
- 基本操作
这里写图片描述
3.案例
热点搜索词滑动统计,每隔10秒种,统计最近60秒钟的搜索词的搜索频次,打印出词频最高的前三个搜索词 一次出现次数
下面给出java 示例和注释:
public class WindowWord {
public static void main(String[] args) throws InterruptedException {
SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("TransFormBlackList");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));
JavaReceiverInputDStream<String> searchLog=jssc.socketTextStream("www.codeguoj.cn",9999);
// 将搜索词转换成只有一个搜索词
JavaDStream<String> searchWordDStream=searchLog.map(new Function<String, String>() {
@Override
public String call(String v1) throws Exception {
return v1.split(" ")[0];
}
});
JavaPairDStream<String,Integer> searchWordsPairsDSTream=searchWordDStream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
// 第二个参数窗口长度
//第三个参数 滑动间隔
//就是说 每个10秒将最近60秒的数据作为一个窗口
JavaPairDStream<String,Integer> searchWorldCountDStream= searchWordsPairsDSTream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
},Durations.seconds(60),Durations.seconds(10));
//执行transform 操作 根据搜索词进行排序 然后获取排名前三的搜索词
JavaPairDStream<String,Integer> finalRDD= searchWorldCountDStream.transformToPair(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override
public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> v1) throws Exception {
JavaPairRDD<Integer,String> countSearchRDD=v1.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1);
}
});
//然后进行降序排序
JavaPairRDD<Integer,String> softedRDD=countSearchRDD.sortByKey(false);
//再一次进行反转
JavaPairRDD<String,Integer> softedRDDCount=softedRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return new Tuple2<>(integerStringTuple2._2,integerStringTuple2._1);
}
});
List<Tuple2<String,Integer>> listResult= softedRDDCount.take(3);
for(Tuple2<String,Integer> v89:listResult){
System.out.println(v89._1+" " +v89._2);
}
return softedRDDCount;
}
});
finalRDD.print();
jssc.start();
jssc.awaitTermination();
jssc.stop();
jssc.close();
}
}
欢迎关注,更多福利
----
![这里写图片描述](https://img.haomeiwen.com/i7822142/0684cf17d774d75a.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
网友评论