美文网首页
Structured Streaming筛选出需要的列

Structured Streaming筛选出需要的列

作者: SunnyMore | 来源:发表于2018-07-11 15:36 被阅读4次

    从Structured Streaming的Dataframe中选取列,有以下几种方式:

    1. df.select("columnNameStr"),eg.
    val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
          .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")
          .select("subsId", "durationForDay")
    
    1. df.select(df("columnNameStr")), eg.
    val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
          .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")
        val subsHourDevice = hourDevice.select(hourDevice("subsId"), hourDevice("durationForHour"))
    
        val queryHour = subsHourDevice.writeStream
          .outputMode("update")
          .option("truncate", "false")
          .format("console")
          .start()
    
    1. df.drop("columnNameStr"),通过drop删除不需要的列

    完整的示例如下:

    package com.spark.sunny.structuredstreaming
    
    import com.spark.sunny.util.UdfUtil
    import org.apache.spark.sql.{Encoders, SparkSession}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    
    case class CdrDto(eventSrcId : String, city : String, billingCycleId : String, subsId : String, custId : String,
                      acctId : String, billingNumber : String, usage : String, duration : String, eventBeginTime : String,
                      cellId : String, roamArea : String, pdpIndex : String, serviceType : String, imsi : String, imei : String
                     )
    /**
      * <Description> <br>
      *
      * @author Sunny<br>
      * @taskId: <br>
      * @version 1.0<br>
      * @createDate 2018/06/19 19:45 <br>
      * @see com.whalecloud.iot.cmp.streaming.receiver <br>
      */
    object CdrReceiver {
      def main(args: Array[String]): Unit = {
    //    val jdbcHostname = "10.45.82.76"
    //    val jdbcPort = 3306
    //    val jdbcDatabase ="cmpcc"
    //    val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"
    //    val jdbcUsername = "dcv"
    //    val jdbcPassword = "DCVsmart$123"
    //    val driverClass = "com.mysql.jdbc.Driver"
    //    import java.util.Properties
    //    val connectionProperties = new Properties()
    //
    //    connectionProperties.put("user", s"${jdbcUsername}")
    //    connectionProperties.put("password", s"${jdbcPassword}")
    //    connectionProperties.setProperty("Driver", driverClass)
    
        val spark = SparkSession
            .builder()
            .appName("cmp-streaming")
            .master("local")
            .getOrCreate()
    
    //    val iotSubs = spark.read.jdbc(jdbcUrl, "iot_subs", connectionProperties)
    //    iotSubs.show()
    
        import spark.implicits._
    
        val schema = Encoders.product[CdrDto].schema
        val lines =  spark
          .readStream
          .format("json")
          .schema(schema)
          .load("C:\\Users\\yaj\\Desktop\\dashboard\\test")
    
        val beginTimeCdr = lines
          .withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyy-MM-dd HH:mm:ss")))
          .withColumn("eventBeginHour", substring($"eventBeginTime", 0, 13))
          .withColumn("eventBeginDay", substring($"eventBeginTime", 0, 10))
    
        val hourCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginHour", "1 hour", "1 hour"),$"serviceType")
          .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")
    
        val queryHour = hourCdr.writeStream
          .outputMode("update")
          .option("truncate", "false")
          .format("console")
          .start()
    
        val dayCdr = beginTimeCdr.groupBy($"subsId",window($"eventBeginDay", "1 day", "1 day"),$"serviceType")
          .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")
    
        val queryDay = dayCdr.writeStream
          .outputMode("update")
          .option("truncate", "false")
          .format("console")
          .start()
    
        queryHour.awaitTermination()
        queryDay.awaitTermination()
    
      }
    }
    
    

    相关文章

      网友评论

          本文标题:Structured Streaming筛选出需要的列

          本文链接:https://www.haomeiwen.com/subject/jfmhpftx.html