美文网首页Spark
GroupState (mapGroupsWithState/f

GroupState (mapGroupsWithState/f

作者: 海边的贝壳林 | 来源:发表于2019-10-28 18:02 被阅读0次

Wrapper class for interacting with per-group state data in mapGroupsWithState and flatMapGroupsWithState operations on KeyValueGroupedDataset.

Detail description on [map/flatMap]GroupsWithState operation.

Both, mapGroupsWithState and flatMapGroupsWithState in KeyValueGroupedDataset will invoke the user-given function on each group (defined by the grouping function in Dataset.groupByKey()) while maintaining user-defined per-group state between invocations. For a static batch Dataset, the function will be invoked once per group. For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger.
That is, in every batch of the StreamingQuery, the function will be invoked once for each group that has data in the trigger. Furthermore, if timeout is set, then the function will invoked on timed out groups (more detail below).

The function is invoked with following parameters.

  • The key of the group.
  • An iterator containing all the values for this group.
  • A user-defined state object set by previous invocations of the given function.

In case of a batch Dataset, there is only one invocation and state object will be empty as there is no prior state. Essentially, for batch Datasets, [map/flatMap]GroupsWithState
is equivalent to [map/flatMap]Groups and any updates to the state and/or timeouts have no effect.

The major difference between mapGroupsWithState and flatMapGroupsWithState is that the former allows the function to return one and only one record, whereas the latter allows the function to return any number of records (including no records). Furthermore, the flatMapGroupsWithState is associated with an operation output mode, which can be either Append or Update. Semantically, this defines whether the output records of one trigger is effectively replacing the previously output records (from previous triggers) or is appending to the list of previously output records. Essentially, this defines how the Result Table (refer to the semantics in the programming guide) is updated, and allows us to reason about the
semantics of later operations.

Important points to note about the function (both mapGroupsWithState and flatMapGroupsWithState).

  • In a trigger, the function will be called only the groups present in the batch. So do not assume that the function will be called in every trigger for every group that has state.
  • There is no guaranteed ordering of values in the iterator in the function, neither with batch, nor with streaming Datasets.
  • All the data will be shuffled before applying the function.
  • If timeout is set, then the function will also be called with no values.
    See more details on GroupStateTimeout below.

Important points to note about using GroupState.

  • The value of the state cannot be null. So updating state with null will throw IllegalArgumentException.
  • Operations on GroupState are not thread-safe. This is to avoid memory barriers.
  • If remove() is called, then exists() will return false,
    get() will throw NoSuchElementException and getOption() will return None
  • After that, if update(newState) is called, then exists() will again return true,
    get() and getOption()will return the updated value.

Important points to note about using GroupStateTimeout.

  • The timeout type is a global param across all the groups (set as timeout param in [map|flatMap]GroupsWithState, but the exact timeout duration/timestamp is configurable per group by calling setTimeout...() in GroupState.
  • Timeouts can be either based on processing time (i.e.
    GroupStateTimeout.ProcessingTimeTimeout) or event time (i.e.
    GroupStateTimeout.EventTimeTimeout).
  • With ProcessingTimeTimeout, the timeout duration can be set by calling GroupState.setTimeoutDuration. The timeout will occur when the clock has advanced by the set duration. Guarantees provided by this timeout with a duration of D ms are as follows:
    • Timeout will never be occur before the clock time has advanced by D ms - Timeout will occur eventually when there is a trigger in the query (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
      For example, the trigger interval of the query will affect when the timeout actually occurs.
      If there is no data in the stream (for any group) for a while, then their will not be any trigger and timeout function call will not occur until there is data.
    • Since the processing time timeout is based on the clock time, it is affected by the variations in the system clock (i.e. time zone changes, clock skew, etc.).
  • With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark(). With this setting, data that is older than the watermark are filtered out. The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp. You can control the timeout delay by two parameters -
    (i) watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering). Guarantees provided by this timeout are as follows:
    • Timeout will never be occur before watermark has exceeded the set timeout.
    • Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.
  • When the timeout occurs for a group, the function is called for that group with no values, and GroupState.hasTimedOut() set to true.
  • The timeout is reset every time the function is called on a group, that is,
    when the group has new data, or the group has timed out. So the user has to set the timeout duration every time the function is called, otherwise there will not be any timeout set.
 // A mapping function that maintains an integer state for string keys and returns a string.
 // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
 def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {

   if (state.hasTimedOut) {                // If called when timing out, remove the state
     state.remove()

   } else if (state.exists) {              // If state exists, use it for processing
     val existingState = state.get         // Get the existing state
     val shouldRemove = ...                // Decide whether to remove the state
     if (shouldRemove) {
       state.remove()                      // Remove the state

     } else {
       val newState = ...
       state.update(newState)              // Set the new state
       state.setTimeoutDuration("1 hour")  // Set the timeout
     }

   } else {
     val initialState = ...
     state.update(initialState)            // Set the initial state
     state.setTimeoutDuration("1 hour")    // Set the timeout
   }
   ...
   // return something
 }

 dataset
   .groupByKey(...)
   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)

Java example of using GroupState:

 // A mapping function that maintains an integer state for string keys and returns a string.
 // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
 MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
    new MapGroupsWithStateFunction<String, Integer, Integer, String>() {

      @Override
      public String call(String key, Iterator<Integer> value, GroupState<Integer> state) {
        if (state.hasTimedOut()) {            // If called when timing out, remove the state
          state.remove();

        } else if (state.exists()) {            // If state exists, use it for processing
          int existingState = state.get();      // Get the existing state
          boolean shouldRemove = ...;           // Decide whether to remove the state
          if (shouldRemove) {
            state.remove();                     // Remove the state

          } else {
            int newState = ...;
            state.update(newState);             // Set the new state
            state.setTimeoutDuration("1 hour"); // Set the timeout
          }

        } else {
          int initialState = ...;               // Set the initial state
          state.update(initialState);
          state.setTimeoutDuration("1 hour");   // Set the timeout
        }
        ...
         // return something
      }
    };

 dataset
     .groupByKey(...)
     .mapGroupsWithState(
         mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);

原文:
org.apache.spark.sql.streaming.GroupState#scala-doc

相关文章

  • GroupState (mapGroupsWithState/f

    Wrapper class for interacting with per-group state data i...

  • f'f'f'f

    反反复复反反复复发

  • F f

    138. der rote Faden(作为主导思想的)红线 139. auf jeden Fall/ Fälle...

  • I f I f I f I f I f I f I f I f

    文/雅雅 以思考,进化时代。 2018年的第一篇文章,我一直思考书写的使命是什么? 对于我,对于我以外的看他的你。...

  • I want to F F F …

    片名/下流祖父 Dirty Grandpa 导演:丹·马泽 编剧:约翰·菲利普斯 主演:罗伯特·德尼罗 / 扎克·...

  • f6f6f

    d4d4d4f

  • F&F---Flash

    Day30 今天上新课了,学flash,老师讲的很仔细。临摹了一张阿孙的画,躺着看星星,还做了小动画,还是挺有趣的...

  • F&F---回家

    Day52 今天坐了一天的车,听了好多有年代的歌,什么小白杨啊,我家住在黄土高原,什么什么的,车上的电视播了一路*...

  • F&F---油泥

    Day36 放假前的补课,捏了一整天泥,下星期去把细节都做完,下下星期完善,然后交作业。 捏着捏着,思绪就飘了起来...

  • F&F---FF

    Day37 五月最后一天啦,这几天早出晚归(^_^)玩的很开心。 时间这个东西真的太快了,还有很多事都没一起做,但...

网友评论

    本文标题:GroupState (mapGroupsWithState/f

    本文链接:https://www.haomeiwen.com/subject/smnbvctx.html