美文网首页
Structured Streaming筛选出需要的列

Structured Streaming筛选出需要的列

作者: SunnyMore | 来源:发表于2018-07-11 15:36 被阅读4次

从Structured Streaming的Dataframe中选取列,有以下几种方式:

  1. df.select("columnNameStr"),eg.
val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")
      .select("subsId", "durationForDay")
  1. df.select(df("columnNameStr")), eg.
val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")
    val subsHourDevice = hourDevice.select(hourDevice("subsId"), hourDevice("durationForHour"))

    val queryHour = subsHourDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()
  1. df.drop("columnNameStr"),通过drop删除不需要的列

完整的示例如下:

package com.spark.sunny.structuredstreaming

import com.spark.sunny.util.UdfUtil
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column

case class CdrDto(eventSrcId : String, city : String, billingCycleId : String, subsId : String, custId : String,
                  acctId : String, billingNumber : String, usage : String, duration : String, eventBeginTime : String,
                  cellId : String, roamArea : String, pdpIndex : String, serviceType : String, imsi : String, imei : String
                 )
/**
  * <Description> <br>
  *
  * @author Sunny<br>
  * @taskId: <br>
  * @version 1.0<br>
  * @createDate 2018/06/19 19:45 <br>
  * @see com.whalecloud.iot.cmp.streaming.receiver <br>
  */
object CdrReceiver {
  def main(args: Array[String]): Unit = {
//    val jdbcHostname = "10.45.82.76"
//    val jdbcPort = 3306
//    val jdbcDatabase ="cmpcc"
//    val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
//    val jdbcUsername = "dcv"
//    val jdbcPassword = "DCVsmart$123"
//    val driverClass = "com.mysql.jdbc.Driver"
//    import java.util.Properties
//    val connectionProperties = new Properties()
//
//    connectionProperties.put("user", s"${jdbcUsername}")
//    connectionProperties.put("password", s"${jdbcPassword}")
//    connectionProperties.setProperty("Driver", driverClass)

    val spark = SparkSession
        .builder()
        .appName("cmp-streaming")
        .master("local")
        .getOrCreate()

//    val iotSubs = spark.read.jdbc(jdbcUrl, "iot_subs", connectionProperties)
//    iotSubs.show()

    import spark.implicits._

    val schema = Encoders.product[CdrDto].schema
    val lines =  spark
      .readStream
      .format("json")
      .schema(schema)
      .load("C:\\Users\\yaj\\Desktop\\dashboard\\test")

    val beginTimeCdr = lines
      .withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyy-MM-dd HH:mm:ss")))
      .withColumn("eventBeginHour", substring($"eventBeginTime", 0, 13))
      .withColumn("eventBeginDay", substring($"eventBeginTime", 0, 10))

    val hourCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginHour", "1 hour", "1 hour"),$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")

    val queryHour = hourCdr.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    val dayCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginDay", "1 day", "1 day"),$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")

    val queryDay = dayCdr.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    queryHour.awaitTermination()
    queryDay.awaitTermination()

  }
}

相关文章

网友评论

      本文标题:Structured Streaming筛选出需要的列

      本文链接:https://www.haomeiwen.com/subject/jfmhpftx.html