Structured Streaming
Structured Streaming 代码案例
package com.ctgu.spark.structured_streaming;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import java.sql.Timestamp;
import java.util.concurrent.TimeoutException;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.window;
public class Spark_StructuredStreaming_Window {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
Logger.getLogger("org").setLevel(Level.WARN);
//创建配置文件
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.master("local[*]")
.getOrCreate();
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Split the lines into words
Dataset<DataInfo> words = lines.as(Encoders.STRING())
.filter((FilterFunction<String>) line -> line.split(" ").length == 2)
.map((MapFunction<String, DataInfo>) line -> {
String[] s = line.split(" ");
return new DataInfo(s[0], new Timestamp(Long.valueOf(s[1])));
}, ExpressionEncoder.javaBean(DataInfo.class));
Dataset<Row> wordCounts = words.withWatermark("timestamp", "10 seconds")
.groupBy(
window(col("timestamp"), "10 seconds", "5 seconds"),
col("word"))
.count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("update")
.format("console")
.option("checkpointLocation", "./checkpoint_chapter9_14")
// .trigger(Trigger.Continuous(60 * 1000L))
.trigger(Trigger.ProcessingTime(0))
.start();
// StreamingQuery query = words.writeStream()
// .outputMode("append")
// .format("console")
// .start();
query.awaitTermination();
}
}
package com.ctgu.spark.structured_streaming;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
import java.sql.Timestamp;
@Data
@AllArgsConstructor
public class DataInfo implements Serializable {
private String word;
private Timestamp timestamp;
}
Structured Streaming,思维导图
Structured-Streaming:
起点:
SparkSession
Source:
Socket
Rate
File
Kafka
Operations :
基础API与Spark-SQL的Dataset相似
事件时间: 窗口与水位线
水位线Join
去重
State Store:
HDFS state store
RocksDB state store
Streaming Queries:
output sink:
File Sink
Kafka Sink
Foreach Sink
ForeachBatch Sink
Console Sink
Memory Sink
Output Mode:
Append
Complete
Update
Query Type:
aggregation
mapGroupsWithState
flatMapGroupsWithState
joins
other
Trigger:
Continuous
ProcessingTime
Once
Checkpoint location
lastProgress
StreamingQueryListener
网友评论