美文网首页
Structured Streaming 官方例子

Structured Streaming 官方例子

作者: 焉知非鱼 | 来源:发表于2018-09-30 00:32 被阅读79次
    package ohmysummer
    
    import java.sql.Timestamp
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.streaming._
    
    
    /**
      * Counts words in UTF8 encoded, '\n' delimited text received from the network.
      *
      * Usage: MapGroupsWithState <hostname> <port>
      * <hostname> and <port> describe the TCP server that Structured Streaming
      * would connect to receive data.
      *
      * To run this on your local machine, you need to first run a Netcat server
      * `$ nc -lk 9999`
      * and then run the example
      * `$ bin/run-example sql.streaming.StructuredSessionization
      * localhost 9999`
      */
    object StructuredSessionization {
    
      def main(args: Array[String]): Unit = {
        if (args.length < 2) {
          System.err.println("Usage: StructuredSessionization <hostname> <port>")
          System.exit(1)
        }
    
        val host = args(0)
        val port = args(1).toInt
    
        val spark = SparkSession
          .builder
          .master("local[2]")
          .appName("StructuredSessionization")
          .getOrCreate()
    
        import spark.implicits._
    
        // Create DataFrame representing the stream of input lines from connection to host:port
        val lines = spark.readStream
          .format("socket")
          .option("host", host)
          .option("port", port)
          .option("includeTimestamp", true)
          .load()
    
        // 将行分割成单词,将单词当作事件的 sessionId
        val events = lines
          .as[(String, Timestamp)]
          .flatMap { case (line, timestamp) => // 模式匹配
            line.split(" ").map(word => Event(sessionId = word, timestamp))
          }
    
        // Sessionize the events. 追踪事件的数量, session 会话的开始和结束时间戳,并报告会话更新.
        val sessionUpdates = events
          .groupByKey(event => event.sessionId)
          .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
    
          case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
    
            // If timed out, then remove session and send final update
            if (state.hasTimedOut) {
              val finalUpdate =
                SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
              state.remove()
              finalUpdate
            } else {
              // Update start and end timestamps in session
              val timestamps = events.map(_.timestamp.getTime).toSeq
              val updatedSession = if (state.exists) {
                val oldSession = state.get
                SessionInfo(
                  oldSession.numEvents + timestamps.size,
                  oldSession.startTimestampMs,
                  math.max(oldSession.endTimestampMs, timestamps.max))
              } else {
                SessionInfo(timestamps.size, timestamps.min, timestamps.max)
              }
              state.update(updatedSession)
    
              // Set timeout such that the session will be expired if no data received for 10 seconds
              state.setTimeoutDuration("10 seconds")
              SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
            }
        }
    
        // Start running the query that prints the session updates to the console
        val query = sessionUpdates
          .writeStream
          .outputMode("update")
          .format("console")
          .start()
    
        query.awaitTermination()
      }
    }
    /** 用户自定义数据类型,表示输入事件 */
    case class Event(sessionId: String, timestamp: Timestamp)
    
    /**
      * 用户定义数据类型,用于存储 session 信息, 作为 mapGroupsWithState 中的 state
      *
      * @param numEvents        session 中收到的 event 总数
      * @param startTimestampMs 会话开始时,接收到的第一个   event 的 timestamp
      * @param endTimestampMs   会话超时前,接收到的最后一个 event 的 timestamp
      */
    case class SessionInfo(
                            numEvents: Int,
                            startTimestampMs: Long,
                            endTimestampMs: Long) {
    
      /** 第一个和最后一个 event 之间, session 会话的持续时间 */
      def durationMs: Long = endTimestampMs - startTimestampMs
    }
    
    /**
      * 用户定义数据类型, 表示由 mapGroupsWithState 返回的 update 信息
      *
      * @param id          session 会话的 Id
      * @param durationMs  会话活跃时长,即, 从第一个 event 到它超时
      * @param numEvents   session 会话活跃期间接收到的 events 数量
      * @param expired     session 是活跃还是超时
      */
    case class SessionUpdate(
                              id: String,
                              durationMs: Long,
                              numEvents: Int,
                              expired: Boolean)
    

    输出:

    +-----+----------+---------+-------+
    |   id|durationMs|numEvents|expired|
    +-----+----------+---------+-------+
    |hello|         0|        1|  false|
    |world|        43|        2|   true|
    +-----+----------+---------+-------+
    
    -------------------------------------------
    Batch: 14
    -------------------------------------------
    +-----+----------+---------+-------+
    |   id|durationMs|numEvents|expired|
    +-----+----------+---------+-------+
    |hello|         0|        1|   true|
    |world|         0|        1|  false|
    +-----+----------+---------+-------+
    
    -------------------------------------------
    Batch: 15
    -------------------------------------------
    +-----+----------+---------+-------+
    |   id|durationMs|numEvents|expired|
    +-----+----------+---------+-------+
    |world|        23|        2|  false|
    +-----+----------+---------+-------+
    

    相关文章

      网友评论

          本文标题:Structured Streaming 官方例子

          本文链接:https://www.haomeiwen.com/subject/yzjioftx.html