美文网首页
Structured-Streaming 学习一——wordco

Structured-Streaming 学习一——wordco

作者: 九七学姐 | 来源:发表于2017-07-24 17:07 被阅读219次

熟悉一个框架最好的方法就是先熟悉例子,明白其工作原理。
所以,先照猫画虎写一下structured streaming中给的wordcount的例子

public class WordCount {
    public static void main(String[] args) throws StreamingQueryException {
        args = (String[]) Arrays.asList("hostname", "port").toArray();     //自己的主机名和spark端口
        if(args.length < 2){
            System.err.println("Usage:JavaStructuredNetworkWordCount localhost:7077");
            System.exit(1);
        }

        String host = args[0];
        int port = Integer.parseInt(args[1]);

        SparkSession spark = SparkSession
                .builder()
                .master("local")      //set the mater of spark,usually local,local[],Spark cluster address.
                .appName("JavaStructuredNetworkWordCount")
                .getOrCreate();


       
        // Create DataFrame representing the stream of input lines from connection to localhost:9999
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")  // 目前Source源只支持File 和 Socket 两种,如果是File的话format填正确的格式,
                // 比如MySql的话.format("json")
                .option("host", "localhost")
                .option("port",9999)
                .load();

        // Split the lines into words
        Dataset<String> words = lines
                .as(Encoders.STRING())
                .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

        // Generate running word count
        Dataset<Row> wordCounts = words.groupBy("value").count();


        // Start running the query that prints the running counts to the console
        //Object seconds;
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("complete")  //complete,append,update。
                // 目前只支持前面两种complete,每次计算完成后,你都能拿到全量的计算结果。
                // append,每次计算完成后,你能拿到增量的计算结果。
                .format("console")   ///输出则是四种console,parquet,memory,foreach 四种.foreach则是可以无限扩展的
                //.trigger(ProcessingTime(5.seconds))
                .start();

        query.awaitTermination();



    }
}

运行成功之后在开始兴奋的开启下一个征程吧~

相关文章

网友评论

      本文标题:Structured-Streaming 学习一——wordco

      本文链接:https://www.haomeiwen.com/subject/bjdjkxtx.html