美文网首页
使用滑动窗口进行实时的热词统计

使用滑动窗口进行实时的热词统计

作者: hipeer | 来源:发表于2018-11-10 16:37 被阅读0次

    开发环境:

    • kafka 1.1.1
    • spark 2.3

    Java代码:

    package cn.spark.streaming;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import kafka.serializer.StringDecoder;
    import scala.Tuple2;
    
    /**
     * 
     * Hot Word Count
     * 
     */
    public class WindowWordCount {
    
        public static void main(String[] args) throws Exception{
            
            SparkConf conf = new SparkConf().setAppName("WindowWordCount");
            
            // create context
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        
            // kafka properties map
            Map<String, String> KafkaParams = new HashMap<String, String>();
            KafkaParams.put("bootstrap.servers", "hserver-1:9092,hserver-2:9092,hserver-3:9092");
            KafkaParams.put("group.id", "WindowWordCount");
            KafkaParams.put("auto.offset.reset", "smallest");
            
            // kafka topic set
            Set<String> topics = new HashSet<String>();
            topics.add(args[0]);
            
            // access data DStream
            JavaPairInputDStream<String, String> SearchLogDStream = 
                    KafkaUtils.createDirectStream(
                            jssc, 
                            String.class, 
                            String.class, 
                            StringDecoder.class, 
                            StringDecoder.class, 
                            KafkaParams, 
                            topics
                            );
            
            // flatMap
            JavaDStream<String> WordDSteram = 
                    SearchLogDStream.flatMap(
                            
                            new FlatMapFunction<Tuple2<String,String>, String>() {
    
                                private static final long serialVersionUID = 4034522628037914742L;
    
                                @Override
                                public Iterator<String> call(Tuple2<String, String> tuple) throws Exception {
    
                                    return Arrays.asList(tuple._2.split(" ")).iterator();
                                }
                            });
            
            // mapToPair
            JavaPairDStream<String, Integer> WordPairDStream = 
                    WordDSteram.mapToPair(
                            
                            new PairFunction<String, String, Integer>() {
    
                                private static final long serialVersionUID = 2101884706537316002L;
    
                                @Override
                                public Tuple2<String, Integer> call(String word) throws Exception {
    
                                    return new Tuple2<String, Integer>(word, 1);
                                }
                            });
            
            // reduceByKeyAndWindow
            JavaPairDStream<String, Integer> WindowWordDStream = 
                    WordPairDStream.reduceByKeyAndWindow(
                            new Function2<Integer, Integer, Integer>() {
                        
                                private static final long serialVersionUID = -358144101893232390L;
    
                                @Override
                                public Integer call(Integer v1, Integer v2) throws Exception {
    
                                    return v1 + v2;
                                }
                            }, 
                            Durations.seconds(10), 
                            Durations.seconds(60)
                            );
            
            // sort
            JavaPairDStream<String, Integer> ResultSortDStream = 
                    WindowWordDStream.transformToPair(
                            
                        new Function<JavaPairRDD<String,Integer>, JavaPairRDD<String,Integer>>() {
    
                            private static final long serialVersionUID = 1441798634812792342L;
    
                            @Override
                            public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> unSortRDD)
                                    throws Exception {
                                
                                JavaPairRDD<String, Integer> sortRDD = unSortRDD
                                .mapToPair(
                                        
                                    new PairFunction<Tuple2<String,Integer>, Integer, String>() {
    
                                        private static final long serialVersionUID = -3715362497048144520L;
    
                                        @Override
                                        public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
    
                                            return new Tuple2<Integer, String>(tuple._2, tuple._1);
                                        }
                                    })
                                .sortByKey(false)
                                .mapToPair(
                                        
                                    new PairFunction<Tuple2<Integer,String>, String, Integer>() {
    
                                        private static final long serialVersionUID = 7017380215451671038L;
    
                                        @Override
                                        public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)
                                                throws Exception {
    
                                            return new Tuple2<String, Integer>(tuple._2, tuple._1);
                                        }
                                    });
    
                                return sortRDD;
                            }
                        });
            
            // print result
            ResultSortDStream.print();
            
            jssc.start();
            
            jssc.awaitTermination();
            
            jssc.close();
            
        }
    }
    
    

    相关文章

      网友评论

          本文标题:使用滑动窗口进行实时的热词统计

          本文链接:https://www.haomeiwen.com/subject/vsnzxqtx.html