美文网首页
Spark Structured Streaming java

Spark Structured Streaming java

作者: shaun_x | 来源:发表于2019-04-09 14:28 被阅读0次

    场景

    实时数据存储在kafka,时间顺序不一定,计算需使用到其他静态资源(rest API或数据库中)
    要求按天计算,计算时有时间顺序要求,每小时计算一次,结果输出到kafka

    关键点

    window

    参考:spark window on event time

    checkpointLocation

    主要用于记录一些metadata,offset和算子计算的中间结果,用于故障恢复和重启
    参考:spark-checkpointing

    startingOffsets

    初始读取kafka的偏移量,当checkpointLocation不存在时使用,或者当算子更新checkpointLocation失效时
    参考:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

    方案1:窗口计算

    使用structured streaming 窗口计算,窗口长度24小时,步长24小时,设置watermark为48小时。

    Dataset<Row> lines = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "_")
        .option("subscribe", "topic")
        .option("startingOffsets", "{\"topic\":{\"0\":_offset_}}")
        .load();
    
    Dataset<Object> dataset = lines.selectExpr("CAST(value AS STRING)")
        .as(Encoders.STRING())
        .mapPartitions(
        (MapPartitionsFunction<String, Object>) x ->
        {
            List<Object> objs = new ArrayList<>();
            ...
            do some thing 
            ...
            return objs.iterator();
        },
        Encoders.bean(Objects.class))
        .filter(Objects::nonNull);
    
     Dataset<Row> dataset2 = dataset.withWatermark("timestamp", "48 hours")
        .groupBy(functions.col("groupId"),
                 functions.window(functions.col("timestamp"), "24 hours"))
        .agg(functions.collect_list("data").as("data"));
    
    Dataset<Object> dataset3 = dataset2.map(
        (MapFunction<Row, Object>) x ->
        {
            for (Object obj : x.getList(2))
            {
                // collect_list 得到的不是结构化的数据,取值比较麻烦,没找到合适的方法 
                GenericRowWithSchema schema = (GenericRowWithSchema) obj;
                int fieldIndex = schema.fieldIndex("fieldName");
                long fieldValue = schema.getLong(fieldIndex);
                ...
            }
            return ...
        }, Encoders.bean(Object.class));
    
    # 输出到kafka需要配置checkpointLocation,集群建议用hdfs
    StreamingQuery query = dateset3.toJSON()
        .writeStream()
        .outputMode("update")
        .option("checkpointLocation", "hdfs://host:port/checkpoints")
        .format("kafka")
        .option("kafka.bootstrap.servers", "_")
        .option("topic", "topic")
        .start();
    
    query.awaitTermination();
    

    方案2 mapGroupsWithState状态流

    窗口函数相当于spark决定了数据是否进入到计算(eventTime早于watermark则丢弃),是否过期(窗口时间早于watermark,意味着该窗口不会再触发更新),而mapGroupsWithState需要自己维护数据过期时间
    参考:https://blog.csdn.net/bluishglc/article/details/80824522
    使用SomeState自己存储和更新24小时的数据,并用于计算

    Dataset<Row> lines = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "_")
        .option("subscribe", topic)
        .option("startingOffsets", ...)
        .load();
    
    MapPartitionsFunction<String, SomeData> dataMapFunc =
        x ->
        {
            List<SomeData> msgs = new ArrayList<>();
            while (x.hasNext())
            {
                ...do something
            }
            return msgs.iterator();
        };
    
    Dataset<SomeData> dataset = lines.selectExpr("CAST(value AS STRING)")
        .as(Encoders.STRING())
        .mapPartitions(dataMapFunc, 
        Encoders.bean(SomeData.class))
        .filter(Objects::nonNull);
    
    String delayThreshold = "24 hours";
    MapGroupsWithStateFunction<Long, SomeData, SomeState,
            SomeOutPut> mapGroupsWithStateFunc =
            (groupKey, dataIterator, groupState) ->
            {
                #spark只在每次触发计算才知到状态有没有过期
                if (groupState.hasTimedOut())
                {
                    groupState.remove();
                    return ...;
                }
    
                SomeState state = groupState.exists() ? groupState.get() : new SomeState();
                ...
                state.addData(dataIterator);
                ...
                groupState.update(state);
                groupState.setTimeoutTimestamp((new Date).getTime(), delayThreshold);
                return new SomeOutPut();
            };
    Dataset<SomeOutPut> d = dataset.withWatermark("timestamp", delayThreshold)
            .groupByKey(SomeData::getKey, Encoders.LONG())
            .mapGroupsWithState(
                    mapGroupsWithStateFunc,
                    Encoders.bean(SomeState.class),
                    Encoders.bean(SomeOutPut.class),
                    GroupStateTimeout.EventTimeTimeout());
    
    # 过滤掉过期触发的无效结果
    StreamingQuery query = d.filter(SomeOutPut::isValid)
            .toJSON()
            .writeStream()
            .outputMode("update")
            .option("checkpointLocation", "hdfs://host:port/checkpoints")
            .format("kafka")
            .option("kafka.bootstrap.servers", "_")
            .option("topic", "topic")
            .start();
    
    query.awaitTermination();
    
    

    相关文章

      网友评论

          本文标题:Spark Structured Streaming java

          本文链接:https://www.haomeiwen.com/subject/xlqbiqtx.html