场景
实时数据存储在kafka,时间顺序不一定,计算需使用到其他静态资源(rest API或数据库中)
要求按天计算,计算时有时间顺序要求,每小时计算一次,结果输出到kafka
关键点
window
checkpointLocation
主要用于记录一些metadata,offset和算子计算的中间结果,用于故障恢复和重启
参考:spark-checkpointing
startingOffsets
初始读取kafka的偏移量,当checkpointLocation不存在时使用,或者当算子更新checkpointLocation失效时
参考:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
方案1:窗口计算
使用structured streaming 窗口计算,窗口长度24小时,步长24小时,设置watermark为48小时。
Dataset<Row> lines = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "_")
.option("subscribe", "topic")
.option("startingOffsets", "{\"topic\":{\"0\":_offset_}}")
.load();
Dataset<Object> dataset = lines.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.mapPartitions(
(MapPartitionsFunction<String, Object>) x ->
{
List<Object> objs = new ArrayList<>();
...
do some thing
...
return objs.iterator();
},
Encoders.bean(Objects.class))
.filter(Objects::nonNull);
Dataset<Row> dataset2 = dataset.withWatermark("timestamp", "48 hours")
.groupBy(functions.col("groupId"),
functions.window(functions.col("timestamp"), "24 hours"))
.agg(functions.collect_list("data").as("data"));
Dataset<Object> dataset3 = dataset2.map(
(MapFunction<Row, Object>) x ->
{
for (Object obj : x.getList(2))
{
// collect_list 得到的不是结构化的数据,取值比较麻烦,没找到合适的方法
GenericRowWithSchema schema = (GenericRowWithSchema) obj;
int fieldIndex = schema.fieldIndex("fieldName");
long fieldValue = schema.getLong(fieldIndex);
...
}
return ...
}, Encoders.bean(Object.class));
# 输出到kafka需要配置checkpointLocation,集群建议用hdfs
StreamingQuery query = dateset3.toJSON()
.writeStream()
.outputMode("update")
.option("checkpointLocation", "hdfs://host:port/checkpoints")
.format("kafka")
.option("kafka.bootstrap.servers", "_")
.option("topic", "topic")
.start();
query.awaitTermination();
方案2 mapGroupsWithState状态流
窗口函数相当于spark决定了数据是否进入到计算(eventTime早于watermark则丢弃),是否过期(窗口时间早于watermark,意味着该窗口不会再触发更新),而mapGroupsWithState需要自己维护数据过期时间
参考:https://blog.csdn.net/bluishglc/article/details/80824522
使用SomeState自己存储和更新24小时的数据,并用于计算
Dataset<Row> lines = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "_")
.option("subscribe", topic)
.option("startingOffsets", ...)
.load();
MapPartitionsFunction<String, SomeData> dataMapFunc =
x ->
{
List<SomeData> msgs = new ArrayList<>();
while (x.hasNext())
{
...do something
}
return msgs.iterator();
};
Dataset<SomeData> dataset = lines.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.mapPartitions(dataMapFunc,
Encoders.bean(SomeData.class))
.filter(Objects::nonNull);
String delayThreshold = "24 hours";
MapGroupsWithStateFunction<Long, SomeData, SomeState,
SomeOutPut> mapGroupsWithStateFunc =
(groupKey, dataIterator, groupState) ->
{
#spark只在每次触发计算才知到状态有没有过期
if (groupState.hasTimedOut())
{
groupState.remove();
return ...;
}
SomeState state = groupState.exists() ? groupState.get() : new SomeState();
...
state.addData(dataIterator);
...
groupState.update(state);
groupState.setTimeoutTimestamp((new Date).getTime(), delayThreshold);
return new SomeOutPut();
};
Dataset<SomeOutPut> d = dataset.withWatermark("timestamp", delayThreshold)
.groupByKey(SomeData::getKey, Encoders.LONG())
.mapGroupsWithState(
mapGroupsWithStateFunc,
Encoders.bean(SomeState.class),
Encoders.bean(SomeOutPut.class),
GroupStateTimeout.EventTimeTimeout());
# 过滤掉过期触发的无效结果
StreamingQuery query = d.filter(SomeOutPut::isValid)
.toJSON()
.writeStream()
.outputMode("update")
.option("checkpointLocation", "hdfs://host:port/checkpoints")
.format("kafka")
.option("kafka.bootstrap.servers", "_")
.option("topic", "topic")
.start();
query.awaitTermination();
网友评论