美文网首页
spark之旅-6.structured streaming

spark之旅-6.structured streaming

作者: 笨鸡 | 来源:发表于2022-03-04 05:26 被阅读0次

    Structured Streaming

    Structured Streaming 代码案例

    package com.ctgu.spark.structured_streaming;
    
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.api.java.function.FilterFunction;
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
    import org.apache.spark.sql.streaming.StreamingQuery;
    import org.apache.spark.sql.streaming.StreamingQueryException;
    import org.apache.spark.sql.streaming.Trigger;
    
    import java.sql.Timestamp;
    import java.util.concurrent.TimeoutException;
    
    import static org.apache.spark.sql.functions.col;
    import static org.apache.spark.sql.functions.window;
    
    
    public class Spark_StructuredStreaming_Window {
        public static void main(String[] args) throws TimeoutException, StreamingQueryException {
            Logger.getLogger("org").setLevel(Level.WARN);
            //创建配置文件
            SparkSession spark = SparkSession
                    .builder()
                    .appName("JavaStructuredNetworkWordCount")
                    .master("local[*]")
                    .getOrCreate();
    
            Dataset<Row> lines = spark
                    .readStream()
                    .format("socket")
                    .option("host", "localhost")
                    .option("port", 9999)
                    .load();
    
            // Split the lines into words
            Dataset<DataInfo> words = lines.as(Encoders.STRING())
                    .filter((FilterFunction<String>) line -> line.split(" ").length == 2)
                    .map((MapFunction<String, DataInfo>) line -> {
                        String[] s = line.split(" ");
                        return new DataInfo(s[0], new Timestamp(Long.valueOf(s[1])));
                    }, ExpressionEncoder.javaBean(DataInfo.class));
    
            Dataset<Row> wordCounts = words.withWatermark("timestamp", "10 seconds")
                    .groupBy(
                            window(col("timestamp"), "10 seconds", "5 seconds"),
                            col("word"))
                    .count();
    
            // Start running the query that prints the running counts to the console
            StreamingQuery query = wordCounts.writeStream()
                    .outputMode("update")
                    .format("console")
                    .option("checkpointLocation", "./checkpoint_chapter9_14")
    //                .trigger(Trigger.Continuous(60 * 1000L))
                    .trigger(Trigger.ProcessingTime(0))
                    .start();
    
    //        StreamingQuery query = words.writeStream()
    //                .outputMode("append")
    //                .format("console")
    //                .start();
    
            query.awaitTermination();
        }
    }
    
    package com.ctgu.spark.structured_streaming;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    
    import java.io.Serializable;
    import java.sql.Timestamp;
    
    @Data
    @AllArgsConstructor
    public class DataInfo implements Serializable {
        private String word;
        private Timestamp timestamp;
    }
    
    

    Structured Streaming,思维导图

    Structured-Streaming:
        起点:
            SparkSession
        Source:
            Socket
            Rate
            File
            Kafka
        Operations :
            基础API与Spark-SQL的Dataset相似
            事件时间: 窗口与水位线
            水位线Join
            去重
        State Store:
            HDFS state store
            RocksDB state store
        Streaming Queries:
            output sink:
                File Sink
                Kafka Sink
                Foreach Sink
                ForeachBatch Sink
                Console Sink
                Memory Sink
            Output Mode:
                Append
                Complete 
                Update 
            Query Type:
                aggregation
                mapGroupsWithState
                flatMapGroupsWithState
                joins
                other
            Trigger:
                Continuous
                ProcessingTime
                Once
            Checkpoint location
            lastProgress
            StreamingQueryListener
    

    相关文章

      网友评论

          本文标题:spark之旅-6.structured streaming

          本文链接:https://www.haomeiwen.com/subject/qrsprrtx.html