美文网首页
TCP Scoket数据流WordCount

TCP Scoket数据流WordCount

作者: hipeer | 来源:发表于2018-11-08 08:41 被阅读0次

    安装nc: yum install nc

    WordCount
    package cn.spark.streaming;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import scala.Tuple2;
    
    /**
     * 基于 scoket的wordcount程序
     * @author ThinkVision
     *
     */
    public class WordCount {
    
        public static void main(String[] args) throws Exception {
            
            SparkConf conf = new SparkConf().setAppName("WordCount");
            
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
            
            // create InputDStream
            JavaReceiverInputDStream<String> lineInputDStream = jssc.socketTextStream("hserver-1", 9999);
            
            // transform lineInputDStream into wordDStream
            JavaDStream<String> wordDStream = lineInputDStream.flatMap(
                    
                    new FlatMapFunction<String, String>() {
    
                        private static final long serialVersionUID = 425086794973670380L;
            
                        @Override
                        public Iterator<String> call(String line) throws Exception {
                            
                            return Arrays.asList(line.split(" ")).iterator();
                        }
                    });
            
            // transform wordDStream into pairDStream
            JavaPairDStream<String, Integer> pairDStream = wordDStream.mapToPair(
                    
                    new PairFunction<String, String, Integer>() {
    
                        private static final long serialVersionUID = -7388596223277641539L;
    
                        @Override
                        public Tuple2<String, Integer> call(String word) throws Exception {
    
                            return new Tuple2<String, Integer>(word, 1);
                        }
                    });
            
            
            // reduceByKey
            JavaPairDStream<String, Integer> resultDStream = pairDStream.reduceByKey(
                    
                    new Function2<Integer, Integer, Integer>() {
                        
                        private static final long serialVersionUID = 5957698719526594872L;
    
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
    
                            return v1 + v2;
                        }
                    });
            
            resultDStream.print();
            
            jssc.start();
            
            jssc.awaitTermination();
            
            jssc.close();
            
        }
    }
    
    

    相关文章

      网友评论

          本文标题:TCP Scoket数据流WordCount

          本文链接:https://www.haomeiwen.com/subject/mnjixqtx.html