美文网首页
Java Spark 简单示例(六)Spark Streamin

Java Spark 简单示例(六)Spark Streamin

作者: 憨人Zoe | 来源:发表于2018-09-26 13:28 被阅读0次

    大数据学习交流微信群

    前两天分享的Flink 学习笔记中有介绍滚动窗口滑动窗口Spark Streaming也是支持的。
    Java Spark 简单示例(五)Spark Streaming 演示了Spark Streaming的常规用法就是滚动窗口。我们设置了批处理的时间长度,Spark 默认每隔一段时间滚动一次窗口,窗口之间不存在重复数据。

    //批处理时间,即一个滚动窗口的长度,滚动间隔等于该长度
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));
    

    本篇将结合官方文档基于上一篇demo演示如何实现滑动窗口

    官网介绍: window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.其中windowLength表示滑动窗口的长度,slideInterval表示滑动间隔。windowLengthslideInterval 必须是批处理时间的整数倍,即上述定义的3s的整数倍.slideInterval不填默认是批处理时间长度即上述定义的3s.

    package com.yzy.spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    public class demo7 {
        private static String appName = "spark.streaming.demo";
        private static String master = "local[*]";
        private static String host = "localhost";
        private static int port = 9999;
    
        public static void main(String[] args) {
            //初始化sparkConf
            SparkConf sparkConf = SparkConfig.getSparkConf().setMaster(master).setAppName(appName);
    
            //获得JavaStreamingContext
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(3));
    
            //从socket源获取数据
            JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
    
            //拆分行成单词
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                public Iterator<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" ")).iterator();
                }
            });
    
            //调用window函数,生成新的DStream,每隔3秒聚合过去6秒内的源数据,滑动间隔不填默认3秒
            //等价于words.window(Durations.seconds(6),Durations.seconds(3));
            JavaDStream<String> newWords = words.window(Durations.seconds(6));
    
            //计算每个单词出现的个数
            JavaPairDStream<String, Integer> wordCounts = newWords.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
    
            //输出结果
            wordCounts.print();
    
            //开始作业
            ssc.start();
            try {
                ssc.awaitTermination();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                ssc.close();
            }
        }
    }
    

    输出结果:

    -------------------------------------------
    Time: 1537934565000 ms
    -------------------------------------------
    (spark,1)
    (1,1)
    (test,1)
    (streaming,1)
    
    -------------------------------------------
    Time: 1537934568000 ms
    -------------------------------------------
    (spark,4)
    (1,1)
    (2,1)
    (3,1)
    (4,1)
    (test,4)
    (streaming,4)
    
    -------------------------------------------
    Time: 1537934571000 ms
    -------------------------------------------
    (spark,6)
    (2,1)
    (3,1)
    (4,1)
    (5,1)
    (6,1)
    (7,1)
    (test,6)
    (streaming,6)
    
    -------------------------------------------
    Time: 1537934574000 ms
    -------------------------------------------
    (spark,6)
    (10,1)
    (5,1)
    (6,1)
    (7,1)
    (8,1)
    (9,1)
    (test,6)
    (streaming,6)
    
    -------------------------------------------
    Time: 1537934577000 ms
    -------------------------------------------
    (spark,6)
    (10,1)
    (11,1)
    (12,1)
    (13,1)
    (8,1)
    (9,1)
    (test,6)
    (streaming,6)
    ....
    

    除了调用window()来转化Dstream,还可以直接调用reduceByKeyAndWindow()函数,使聚合函数按照滑动窗口来执行。如下:

    //.....省略
    JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s,1);
                }
            });
    
    JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            }, Durations.seconds(6), Durations.seconds(3));//用法相同
    
    windowedWordCounts.print();
    

    相关文章

      网友评论

          本文标题:Java Spark 简单示例(六)Spark Streamin

          本文链接:https://www.haomeiwen.com/subject/ieudoftx.html