美文网首页
Spark Structured Streaming 解析 JS

Spark Structured Streaming 解析 JS

作者: 焉知非鱼 | 来源:发表于2018-09-14 22:22 被阅读584次

    Producer

    发送 JSON 数据到 Kafka:

    from confluent_kafka import Producer
    
    p = Producer({'bootstrap.servers': 'localhost:9092'})
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    with open("/Users/ohmycloud/bigdata/Spark/camelia.json") as f:
      data = f.read()
      p.poll(0)
      p.produce('net-logs', data.encode('utf-8'), callback=delivery_report)
    
    p.flush()
    

    JSON 数据内容如下:

    {
        "metadata":{
            "access_token":"c.FmDPkzyzaQe...",
            "client_version":1
        },
        "devices":{
            "thermostats":{
                "peyiJNo0IldT2YlIVtYaGQ":{
                    "device_id":"peyiJNo0IldT2YlIVtYaGQ",
                    "locale":"en-US",
                    "software_version":"4.0",
                    "structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
                    "name":"Hallway (upstairs)",
                    "name_long":"Hallway Thermostat (upstairs)",
                    "last_connection":"2016-10-31T23:59:59.000Z",
                    "is_online":true,
                    "can_cool":true,
                    "can_heat":true,
                    "is_using_emergency_heat":true,
                    "has_fan":true,
                    "fan_timer_active":true,
                    "fan_timer_timeout":"2016-10-31T23:59:59.000Z",
                    "has_leaf":true,
                    "temperature_scale":"C",
                    "target_temperature_f":72,
                    "target_temperature_c":21.5,
                    "target_temperature_high_f":80,
                    "target_temperature_high_c":24.5,
                    "target_temperature_low_f":65,
                    "target_temperature_low_c":19.5,
                    "eco_temperature_high_f":80,
                    "eco_temperature_high_c":24.5,
                    "eco_temperature_low_f":65,
                    "eco_temperature_low_c":19.5,
                    "away_temperature_high_f":80,
                    "away_temperature_high_c":24.5,
                    "away_temperature_low_f":65,
                    "away_temperature_low_c":19.5,
                    "hvac_mode":"heat",
                    "previous_hvac_mode":"heat",
                    "ambient_temperature_f":72,
                    "ambient_temperature_c":21.5,
                    "humidity":40,
                    "hvac_state":"heating",
                    "where_id":"UNCBGUnN24...",
                    "is_locked":true,
                    "locked_temp_min_f":65,
                    "locked_temp_max_f":80,
                    "locked_temp_min_c":19.5,
                    "locked_temp_max_c":24.5,
                    "label":"Pat's room",
                    "where_name":"Hallway",
                    "sunlight_correction_enabled":true,
                    "sunlight_correction_active":true,
                    "fan_timer_duration":"15",
                    "time_to_target":"~15",
                    "time_to_target_training":"training"
                }
            },
            "smoke_co_alarms":{
                "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs":{
                    "device_id":"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
                    "locale":"en-US",
                    "software_version":"1.01",
                    "structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
                    "name":"Hallway (upstairs)",
                    "name_long":"Hallway Protect (upstairs)",
                    "last_connection":"2016-10-31T23:59:59.000Z",
                    "is_online":true,
                    "battery_health":"ok",
                    "co_alarm_state":"ok",
                    "smoke_alarm_state":"ok",
                    "is_manual_test_active":true,
                    "last_manual_test_time":"2016-10-31T23:59:59.000Z",
                    "ui_color_state":"gray",
                    "where_id":"UNCBGUnN24...",
                    "where_name":"Hallway"
                }
            },
            "cameras":{
                "awJo6rH0IldT2YlIVtYaGQ":{
                    "device_id":"awJo6rH...",
                    "software_version":"4.0",
                    "structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
                    "where_id":"d6reb_OZTM...",
                    "where_name":"Hallway",
                    "name":"Hallway (upstairs)",
                    "name_long":"Hallway Camera (upstairs)",
                    "is_online":true,
                    "is_streaming":true,
                    "is_audio_input_enabled":true,
                    "last_is_online_change":"2016-12-29T18:42:00.000Z",
                    "is_video_history_enabled":true,
                    "web_url":"https://home.nest.com/cameras/device_id?auth=access_token",
                    "app_url":"nestmobile://cameras/device_id?auth=access_token",
                    "is_public_share_enabled":true,
                    "activity_zones":[
                        {
                            "name":"Walkway",
                            "id":"244083"
                        },
                        {
                            "name":"Walkway2",
                            "id2":"244084"
                        }
                    ],
                    "public_share_url":"https://video.nest.com/live/STRING1?STRING2",
                    "snapshot_url":"STRING1/device_id/STRING2?auth=access_token",
                    "last_event":{
                        "has_sound":true,
                        "has_motion":true,
                        "has_person":true,
                        "start_time":"2016-12-29T00:00:00.000Z",
                        "end_time":"2016-12-29T18:42:00.000Z",
                        "urls_expire_time":"2016-10-31T23:59:59.000Z",
                        "web_url":"https://home.nest.com/cameras/device_id/cuepoints/STRING?auth=access_token",
                        "app_url":"nestmobile://cameras/device_id/cuepoints/STRING?auth=access_token",
                        "image_url":"STRING1/device_id/STRING2?auth=access_token",
                        "animated_image_url":"STRING1/device_id/STRING2?auth=access_token",
                        "activity_zone_ids":[
                            "244083",
                            "244084"
                        ]
                    }
                }
            }
        },
        "structures":{
            "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw":{
                "structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
                "thermostats":[
                    "peyiJNo0IldT2YlIVtYaGQ",
                    "ggfgfdgrgrgg"
                ],
                "smoke_co_alarms":[
                    "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
                    "ddddddddddd"
                ],
                "cameras":[
                    "awJo6r...",
                    "ddddddssssssss"
                ],
                "away":"home",
                "name":"Home",
                "country_code":"US",
                "postal_code":"94304",
                "peak_period_start_time":"2016-10-31T23:59:59.000Z",
                "peak_period_end_time":"2016-10-31T23:59:59.000Z",
                "time_zone":"America/Los_Angeles",
                "eta":{
                    "trip_id":"myTripHome1024",
                    "estimated_arrival_window_begin":"2016-10-31T22:42:59.000Z",
                    "estimated_arrival_window_end":"2016-10-31T23:59:59.000Z"
                },
                "eta_begin":"2016-08-04T13:21:37-07:00",
                "co_alarm_state":"ok",
                "smoke_alarm_state":"ok",
                "rhr_enrollment":true,
                "wwn_security_state":"ok",
                "wheres":{
                    "Fqp6wJI...":{
                        "where_id":"Fqp6wJI...",
                        "name":"Bedroom"
                    }
                }
            }
        }
    }
    

    代码

    package wmdevice
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.types._
    
    object CamerasStructedStreaming {
      def main(args: Array[String]) {
    
        val spark = SparkSession
          .builder
          .appName("Camelia")
          .master("local[*]")
          .getOrCreate()
    
        val schema = new StructType()
          .add("metadata", new StructType()
            .add("access_token", StringType)
            .add("client_version", IntegerType))
          .add("devices", new StructType()
            .add("thermostats", MapType(StringType, new StructType()
              .add("device_id", StringType)
              .add("locale", StringType)
              .add("software_version", StringType)
              .add("structure_id", StringType)
              .add("name", StringType)
              .add("name_long", StringType)
              .add("last_connection", TimestampType)
              .add("is_online", BooleanType)
              .add("can_cool", BooleanType)
              .add("can_heat", BooleanType)
              .add("is_using_emergency_heat", BooleanType)
              .add("has_fan", BooleanType)
              .add("fan_timer_active", BooleanType)
              .add("fan_timer_timeout", TimestampType)
              .add("has_leaf", BooleanType)
              .add("temperature_scale", StringType)
              .add("target_temperature_f", StringType)
              .add("target_temperature_c", StringType)
              .add("target_temperature_high_f", StringType)
              .add("target_temperature_high_c", StringType)
              .add("target_temperature_low_f", StringType)
              .add("target_temperature_low_c", StringType)
              .add("eco_temperature_high_f", StringType)
              .add("eco_temperature_high_c", StringType)
              .add("eco_temperature_low_f", StringType)
              .add("eco_temperature_low_c", StringType)
              .add("away_temperature_high_f", StringType)
              .add("away_temperature_high_c", StringType)
              .add("away_temperature_low_f", StringType)
              .add("away_temperature_low_c", StringType)
              .add("hvac_mode", StringType)
              .add("previous_hvac_mode", StringType)
              .add("ambient_temperature_f", StringType)
              .add("ambient_temperature_c", StringType)
              .add("humidity", StringType)
              .add("hvac_state", StringType)
              .add("where_id", StringType)
              .add("is_locked", BooleanType)
              .add("locked_temp_min_f", StringType)
              .add("locked_temp_max_f", StringType)
              .add("locked_temp_min_c", StringType)
              .add("locked_temp_max_c", StringType)
              .add("label", StringType)
              .add("where_name", StringType)
              .add("sunlight_correction_enabled", BooleanType)
              .add("sunlight_correction_active", BooleanType)
              .add("fan_timer_duration", StringType)
              .add("time_to_target", StringType)
              .add("time_to_target_training", StringType))
            )
            .add("smoke_co_alarms", MapType(StringType, new StructType()
              .add("device_id", StringType)
              .add("locale", StringType)
              .add("software_version", StringType)
              .add("structure_id", StringType)
              .add("name", StringType)
              .add("name_long", StringType)
              .add("last_connection", TimestampType)
              .add("is_online", BooleanType)
              .add("battery_health", StringType)
              .add("co_alarm_state", StringType)
              .add("smoke_alarm_state", StringType)
              .add("is_manual_test_active", BooleanType)
              .add("last_manual_test_time", TimestampType)
              .add("ui_color_state", StringType)
              .add("where_id", StringType)
              .add("where_name", StringType))
            )
            .add("cameras", MapType(StringType, new StructType()
              .add("device_id", StringType)
              .add("software_version", StringType)
              .add("structure_id", StringType)
              .add("where_id", StringType)
              .add("where_name", StringType)
              .add("name", StringType)
              .add("name_long", StringType)
              .add("is_online", BooleanType)
              .add("is_streaming", BooleanType)
              .add("is_audio_input_enabled", BooleanType)
              .add("last_is_online_change", TimestampType)
              .add("is_video_history_enabled", BooleanType)
              .add("web_url", StringType)
              .add("app_url", StringType)
              .add("is_public_share_enabled", BooleanType)
              .add("activity_zones", ArrayType(new StructType().add("name", StringType).add("id", StringType), true))
              .add("public_share_url", StringType)
              .add("snapshot_url", StringType)
                .add("last_event", new StructType()
                  .add("has_sound", BooleanType)
                  .add("has_motion", BooleanType)
                  .add("has_person", BooleanType)
                  .add("start_time", TimestampType)
                  .add("end_time", TimestampType)
                  .add("urls_expire_time", TimestampType)
                  .add("web_url", StringType)
                  .add("app_url", StringType)
                  .add("image_url", StringType)
                  .add("animated_image_url", StringType)
                  .add("activity_zone_ids", ArrayType(StringType, true))
                )
            )
            )
            )
          .add("structures", MapType(StringType, new StructType()
            .add("structure_id", StringType)
            .add("thermostats", ArrayType(StringType, true))
            .add("smoke_co_alarms", ArrayType(StringType, true))
            .add("cameras", ArrayType(StringType, true))
            .add("away", StringType)
            .add("name", StringType)
            .add("country_code", StringType)
            .add("postal_code", StringType)
            .add("peak_period_start_time", TimestampType)
            .add("peak_period_end_time", TimestampType)
            .add("time_zone", StringType)
            .add("eta", new StructType()
                .add("trip_id", StringType)
                .add("estimated_arrival_window_begin", TimestampType)
                .add("estimated_arrival_window_end", TimestampType)
              )
            .add("eta_begin", TimestampType)
            .add("co_alarm_state", StringType)
            .add("smoke_alarm_state", StringType)
            .add("rhr_enrollment", BooleanType)
            .add("wwn_security_state", StringType)
            .add("wheres", MapType(StringType, new StructType()
              .add("where_id", StringType)
              .add("name", StringType))
            ))
          )
    
        val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
        val jsonOptions =  Map("timestampFormat" -> nestTimestampFormat)
    
        val parsed = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("subscribe", "net-logs")
          .option("startingOffsets", "earliest")
          .load()
          .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
    
        import spark.implicits._
        val camera = parsed
          .select(explode($"parsed_value.devices.cameras"))
          .select("value.*")
    
        camera.printSchema()
        val sightings = camera
          .select("device_id", "last_event.has_person", "last_event.start_time")
    
        val console = sightings.writeStream
          .format("console")
          .outputMode(OutputMode.Append())
    
        val query = console.start()
    
        query.awaitTermination()
    
      }
    }
    

    JDBCSink

    import java.sql._
    
    import org.apache.spark.sql.{ForeachWriter, Row}
    
    class  JDBCSink(url:String, user:String, pwd:String) extends ForeachWriter[Row] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
    
      def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }
    
      def process(value: Row): Unit = {
    
        statement.executeUpdate("INSERT INTO carema " +
          "VALUES (" + value.get(0) + "," + value.get(1) + "," + value.get(2) + ")")
      }
    
      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
    }
    

    写数据库

        // 写入数据库
        val url="jdbc:mysql://127.0.0.1:6606/test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=false"
        val user ="root"
        val pwd = "111608"
    
        val writer = new JDBCSink(url,user, pwd)
        val query =
          sightings
            .writeStream
            .foreach(writer)
            .outputMode("update")
            .trigger(Trigger.ProcessingTime("25 seconds"))
            .start()
    
        query.awaitTermination()
    

    one more example

    json

    {
       "createTime" : 1536571926,
       "event" : {
          "frequency" : -1,
          "info" : {
             "VCU_VacmPumpSta" : {
                "diff_time" : [ 2, 11 ],
                "value" : [ 1, 0 ]
             }
          },
          "startTime" : -1
       },
       "iccid" : "89860117750045560712",
       "signal1s" : {
          "frequency" : 1,
          "info" : {
             "APA_DistToparkslot" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
             "APA_Process_bar" : [ 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127 ],
             "BMS_ActlExchgCurnt" : [
                10016,
                10016,
                10021,
                10021,
                10021,
                10021,
                10012,
                10012,
                10012,
                10012,
                10012,
                10007,
                10007
             ],
             "BMS_BatSOC_Actl" : [ 355, 355, 355, 355, 355, 355, 355, 355, 355, 355, 355, 355, 355 ],
             "BMS_BatSOC_Rpt" : [ 355, 355, 355, 355, 355, 355, 355, 355, 355, 355, 355, 355, 355 ],
             "BMS_IsoResistance" : [ 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120, 120 ],
             "ESC_VehicleSpeed" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
             "HU_TargetSOC" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
             "ICU_DisplayVehicleSpeed" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
             "ICU_ICUTotalOdometer" : [
                3252,
                3252,
                3252,
                3252,
                3252,
                3252,
                3252,
                3252,
                3252,
                3252,
                3252,
                3252,
                3252
             ],
             "LateralACC" : [ 201, 202, 201, 201, 201, 201, 201, 202, 201, 201, 202, 200, 201 ],
             "LongitudeACC" : [ 124, 124, 124, 124, 124, 124, 124, 124, 124, 124, 124, 124, 124 ],
             "SAS_Angle" : [
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767
             ],
             "SAS_RotSpeed" : [
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767,
                32767
             ],
             "VCU_AcclPedalPos" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
             "VCU_ActlMotorRotateSpd" : [
                10000,
                10000,
                10000,
                10000,
                10000,
                10000,
                10000,
                10000,
                10000,
                10000,
                10000,
                10000,
                10000
             ],
             "VCU_Actl_MotorRotateSpd" : [
                16384,
                16384,
                16384,
                16384,
                16384,
                16384,
                16384,
                16384,
                16384,
                16384,
                16384,
                16384,
                16384
             ],
             "VCU_Actl_MotorTorque" : [
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000
             ],
             "VCU_BrkPedalPos" : [ 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 1, 2 ],
             "VCU_DrvRangeDistEst" : [ 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63 ],
             "VCU_DrvReq_MotorTorq" : [
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000,
                2000
             ],
             "VCU_MotorActualPower" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
             "VCU_VhclPwrCnsmpActl" : [ 209, 209, 209, 209, 209, 209, 210, 210, 210, 210, 210, 210, 210 ],
             "YRS_YawRate" : [
                18000,
                18000,
                18000,
                18000,
                18000,
                18000,
                18000,
                18000,
                18000,
                18000,
                18005,
                18000,
                18000
             ]
          },
          "startTime" : 0
       },
       "signal30s" : {
          "frequency" : 0.3333333432674408,
          "info" : {
             "AC_ActTotalPower" : [ 5 ],
             "AC_EnvirTemp" : [ 136 ],
             "AC_HVHActlPwr_HVAC" : [ 0 ],
             "AC_HVHDeviceInternTemp_HVAC" : [ 63 ],
             "AC_InAirPM25Value" : [ 36 ],
             "AC_IndoorTemp" : [ 126 ],
             "AC_OutAirQualityLevel" : [ 0 ],
             "AC_RefrgHiPressure" : [ 8 ],
             "AC_SeatHeatTemp_FL_Rsrv" : [ 0 ],
             "AC_SeatHeatTemp_FR_Rsrv" : [ 0 ],
             "BAT_HVHActlPwr" : [ -1 ],
             "BAT_HVHDeviceInternTemp" : [ -1 ],
             "BAT_HighVolDCTolalPwr" : [ -1 ],
             "BAT_PumpSpeedDutyRequest" : [ -1 ],
             "BMS_AuxHeatReqPower_Reserved" : [ 0 ],
             "BMS_BatCapacity" : [ 198 ],
             "BMS_BatSOH" : [ 1000 ],
             "BMS_BatteryDTC_Num" : [ 1 ],
             "BMS_BatteryDTC_list" : [
                [ 13698951 ]
             ],
             "BMS_BatterySubSysCod" : [ 1 ],
             "BMS_BatterySubSysNum" : [ 1 ],
             "BMS_BatteryType" : [ 6 ],
             "BMS_CellVol" : [
                [
                   3614,
                   3614,
                   3613,
                   3614,
                   3614,
                   3614,
                   3613,
                   3613,
                   3613,
                   3613,
                   3614,
                   3613,
                   3614,
                   3615,
                   3615,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3613,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3613,
                   3613,
                   3611,
                   3613,
                   3614,
                   3614,
                   3614,
                   3615,
                   3613,
                   3613,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3615,
                   3613,
                   3614,
                   3614,
                   3614,
                   3615,
                   3614,
                   3614,
                   3614,
                   3614,
                   3615,
                   3614,
                   3615,
                   3614,
                   3615,
                   3614,
                   3615,
                   3615,
                   3615,
                   3614,
                   3614,
                   3613,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3615,
                   3614,
                   3614,
                   3613,
                   3614,
                   3613,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614,
                   3614
                ]
             ],
             "BMS_CellVolAve" : [ 3613 ],
             "BMS_CellVolMax" : [ 3615 ],
             "BMS_CellVolMin" : [ 3611 ],
             "BMS_ContactorTempAve_DCCharger" : [ 70 ],
             "BMS_ContactorTempAve_Negative" : [ 73 ],
             "BMS_ContactorTempAve_Positive" : [ 72 ],
             "BMS_DCS_ActlChrgCurrent" : [ 0 ],
             "BMS_DCS_ActlChrgPower" : [ 0 ],
             "BMS_DCS_ActlChrgVoltage" : [ 0 ],
             "BMS_DC_AC_RemChrgTime" : [ 600 ],
             "BMS_ESS_InletColantActtemp" : [ 69 ],
             "BMS_ESS_InletColanttargettemp" : [ 70 ],
             "BMS_ESS_outletColantActtemp" : [ 70 ],
             "BMS_HVBatActlVoltage" : [ 3252 ],
             "BMS_HVBatCellTempAve" : [ 68 ],
             "BMS_HVBatCellTempMax" : [ 69 ],
             "BMS_HVBatCellTempMin" : [ 68 ],
             "BMS_HVcppltnTempAve" : [ 69 ],
             "BMS_HVcppltnTempMin" : [ 68 ],
             "BMS_HVcppltnTempmax" : [ 70 ],
             "BMS_MaxCellTemp" : [ 100 ],
             "BMS_MaxCellVol" : [ 4310 ],
             "BMS_MaxConDisCurnt" : [ 2000 ],
             "BMS_MaxInstanDisCurnt" : [ 3920 ],
             "BMS_MaxTempProbeCod" : [ 0 ],
             "BMS_MaxVolCellCod" : [ 14 ],
             "BMS_MinCellTemp" : [ 15 ],
             "BMS_MinCellVol" : [ 3050 ],
             "BMS_MinTempProbeCod" : [ 4 ],
             "BMS_MinVolCellCod" : [ 30 ],
             "BMS_NomCellCap" : [ 62 ],
             "BMS_PwrRecupMaxConChrgCurnt" : [ 1860 ],
             "BMS_PwrRecupMaxInstanChrgCurnt" : [ 3000 ],
             "BMS_TempProbe" : [
                [
                   89,
                   89,
                   89,
                   89,
                   88,
                   89,
                   88,
                   89,
                   88,
                   89,
                   88,
                   89,
                   89,
                   89,
                   88,
                   89,
                   88,
                   89
                ]
             ],
             "BMS_TotalCellNo" : [ 90 ],
             "BMS_TotalTempProbe" : [ 18 ],
             "GPS_Heading" : [ 0 ],
             "GPS_Latitude" : [ 0 ],
             "GPS_Longitude" : [ 0 ],
             "GPS_Speed" : [ 0 ],
             "ICU_AverageVehicleSpeed" : [ 0 ],
             "ICU_ICUTripAOdometer" : [ 0 ],
             "ICU_ICUTripBOdometer" : [ 0 ],
             "ICU_WashLiquidLevelWarn" : [ 0 ],
             "ICU_drive_time" : [ 0 ],
             "PTG_DoorOpenRatio" : [ 0 ],
             "TPMS_PressureValue_FL" : [ 255 ],
             "TPMS_PressureValue_FR" : [ 255 ],
             "TPMS_PressureValue_RL" : [ 255 ],
             "TPMS_PressureValue_RR" : [ 255 ],
             "TPMS_TireTempValue_FL" : [ 255 ],
             "TPMS_TireTempValue_FR" : [ 255 ],
             "TPMS_TireTempValue_RL" : [ 255 ],
             "TPMS_TireTempValue_RR" : [ 255 ],
             "VCU_AtmosPressure" : [ 204 ],
             "VCU_CruiseCtrTgtSpd" : [ 0 ],
             "VCU_MCU_Input_Curr" : [ 1024 ],
             "VCU_MCU_Input_Vol" : [ 648 ],
             "VCU_MCU_Temp" : [ 72 ],
             "VCU_MotorActualPower" : [ 0 ],
             "VCU_MotorDTC_Num" : [ 1 ],
             "VCU_MotorDTC_list" : [
                [ 134293412 ]
             ],
             "VCU_MotorTemp" : [ 74 ],
             "VCU_PCUInletCooltTemp" : [ 72 ],
             "VCU_PwrCoolFanSpdDuty" : [ 2 ],
             "VCU_VacuumPressure" : [ 81 ],
             "VCU_VhclPwrCnsmpAvrg" : [ 144 ]
          },
          "startTime" : 0
       },
       "tboxSn" : "VE70045900-J7U0077",
       "version" : "0.1.0.0",
       "vin" : "XXXX000000000XXXX"
    }
    

    定义 schema

    package ohmysummer.pipeline.schema
    
    import org.apache.spark.sql.types._
    
    class DcSaleData {
      val schema = new StructType()
        .add("createTime", IntegerType)
        .add("iccid",      StringType)
        .add("tboxSn",     StringType)
        .add("version",    StringType)
        .add("vin",        StringType)
    
        .add("event", new StructType()
          .add("frequency", IntegerType)
          .add("info",      new StructType()
            .add("VCU_VacmPumpSta", new StructType()
              .add("diff_time", ArrayType(IntegerType))
              .add("value", ArrayType(IntegerType))
            ))
          .add("startTime", IntegerType))
    
        .add("signal1s",new StructType()
          .add("frequency", IntegerType)
          .add("info", new StructType()
            .add("APA_DistToparkslot", ArrayType(IntegerType, true), true)
            .add("APA_Process_bar", ArrayType(IntegerType, true), true)
            .add("BMS_ActlExchgCurnt", ArrayType(IntegerType, true), true)
            .add("BMS_BatSOC_Actl", ArrayType(IntegerType, true), true)
            .add("BMS_BatSOC_Rpt", ArrayType(IntegerType, true), true)
            .add("BMS_IsoResistance", ArrayType(IntegerType, true), true)
            .add("ESC_VehicleSpeed", ArrayType(IntegerType, true), true)
            .add("HU_TargetSOC", ArrayType(IntegerType, true), true)
            .add("ICU_DisplayVehicleSpeed", ArrayType(IntegerType, true), true)
            .add("ICU_ICUTotalOdometer", ArrayType(IntegerType, true), true)
            .add("LateralACC", ArrayType(IntegerType, true), true)
            .add("LongitudeACC", ArrayType(IntegerType, true), true)
            .add("SAS_Angle", ArrayType(IntegerType, true), true)
            .add("SAS_RotSpeed", ArrayType(IntegerType, true), true)
            .add("VCU_AcclPedalPos", ArrayType(IntegerType, true), true)
            .add("VCU_ActlMotorRotateSpd", ArrayType(IntegerType, true), true)
            .add("VCU_Actl_MotorRotateSpd", ArrayType(IntegerType, true), true)
            .add("VCU_Actl_MotorTorque", ArrayType(IntegerType, true), true)
            .add("VCU_BrkPedalPos", ArrayType(IntegerType, true), true)
            .add("VCU_DrvRangeDistEst", ArrayType(IntegerType, true), true)
            .add("VCU_DrvReq_MotorTorq", ArrayType(IntegerType, true), true)
            .add("VCU_MotorActualPower", ArrayType(IntegerType, true), true)
            .add("VCU_VhclPwrCnsmpActl", ArrayType(IntegerType, true), true)
            .add("YRS_YawRate", ArrayType(IntegerType, true), true)
          )
          .add("startTime", IntegerType))
    
        .add("signal30s", new StructType()
          .add("frequency", FloatType)
          .add("info", new StructType()
            .add("AC_ActTotalPower", ArrayType(IntegerType, true), true)
            .add("AC_EnvirTemp", ArrayType(IntegerType, true), true)
            .add("AC_HVHActlPwr_HVAC", ArrayType(IntegerType, true), true)
            .add("AC_HVHDeviceInternTemp_HVAC", ArrayType(IntegerType, true), true)
            .add("AC_InAirPM25Value", ArrayType(IntegerType, true), true)
            .add("AC_IndoorTemp", ArrayType(IntegerType, true), true)
            .add("AC_OutAirQualityLevel", ArrayType(IntegerType, true), true)
            .add("AC_RefrgHiPressure", ArrayType(IntegerType, true), true)
            .add("AC_SeatHeatTemp_FL_Rsrv", ArrayType(IntegerType, true), true)
            .add("AC_SeatHeatTemp_FR_Rsrv", ArrayType(IntegerType, true), true)
            .add("BAT_HVHActlPwr", ArrayType(IntegerType, true), true)
            .add("BAT_HVHDeviceInternTemp", ArrayType(IntegerType, true), true)
            .add("BAT_HighVolDCTolalPwr", ArrayType(IntegerType, true), true)
            .add("BAT_PumpSpeedDutyRequest", ArrayType(IntegerType, true), true)
            .add("BMS_AuxHeatReqPower_Reserved", ArrayType(IntegerType, true), true)
            .add("BMS_BatCapacity", ArrayType(IntegerType, true), true)
            .add("BMS_BatSOH", ArrayType(IntegerType, true), true)
            .add("BMS_BatteryDTC_Num", ArrayType(IntegerType, true), true)
            .add("BMS_BatteryDTC_list", ArrayType(ArrayType(IntegerType), true), true)
            .add("BMS_BatterySubSysCod", ArrayType(IntegerType, true), true)
            .add("BMS_BatterySubSysNum", ArrayType(IntegerType, true), true)
            .add("BMS_BatteryType", ArrayType(IntegerType, true), true)
            .add("BMS_CellVol", ArrayType(ArrayType(IntegerType), true), true)
            .add("BMS_CellVolAve", ArrayType(IntegerType, true), true)
            .add("BMS_CellVolMax", ArrayType(IntegerType, true), true)
            .add("BMS_CellVolMin", ArrayType(IntegerType, true), true)
            .add("BMS_ContactorTempAve_DCCharger", ArrayType(IntegerType, true), true)
            .add("BMS_ContactorTempAve_Negative", ArrayType(IntegerType, true), true)
            .add("BMS_ContactorTempAve_Positive", ArrayType(IntegerType, true), true)
            .add("BMS_DCS_ActlChrgCurrent", ArrayType(IntegerType, true), true)
            .add("BMS_DCS_ActlChrgPower", ArrayType(IntegerType, true), true)
            .add("BMS_DCS_ActlChrgVoltage", ArrayType(IntegerType, true), true)
            .add("BMS_DC_AC_RemChrgTime", ArrayType(IntegerType, true), true)
            .add("BMS_ESS_InletColantActtemp", ArrayType(IntegerType, true), true)
            .add("BMS_ESS_InletColanttargettemp", ArrayType(IntegerType, true), true)
            .add("BMS_ESS_outletColantActtemp", ArrayType(IntegerType, true), true)
            .add("BMS_HVBatActlVoltage", ArrayType(IntegerType, true), true)
            .add("BMS_HVBatCellTempAve", ArrayType(IntegerType, true), true)
            .add("BMS_HVBatCellTempMax", ArrayType(IntegerType, true), true)
            .add("BMS_HVBatCellTempMin", ArrayType(IntegerType, true), true)
            .add("BMS_HVcppltnTempAve", ArrayType(IntegerType, true), true)
            .add("BMS_HVcppltnTempMin", ArrayType(IntegerType, true), true)
            .add("BMS_HVcppltnTempmax", ArrayType(IntegerType, true), true)
            .add("BMS_MaxCellTemp", ArrayType(IntegerType, true), true)
            .add("BMS_MaxCellVol", ArrayType(IntegerType, true), true)
            .add("BMS_MaxConDisCurnt", ArrayType(IntegerType, true), true)
            .add("BMS_MaxInstanDisCurnt", ArrayType(IntegerType, true), true)
            .add("BMS_MaxTempProbeCod", ArrayType(IntegerType, true), true)
            .add("BMS_MaxVolCellCod", ArrayType(IntegerType, true), true)
            .add("BMS_MinCellTemp", ArrayType(IntegerType, true), true)
            .add("BMS_MinCellVol", ArrayType(IntegerType, true), true)
            .add("BMS_MinTempProbeCod", ArrayType(IntegerType, true), true)
            .add("BMS_MinVolCellCod", ArrayType(IntegerType, true), true)
            .add("BMS_NomCellCap", ArrayType(IntegerType, true), true)
            .add("BMS_PwrRecupMaxConChrgCurnt", ArrayType(IntegerType, true), true)
            .add("BMS_PwrRecupMaxInstanChrgCurnt", ArrayType(IntegerType, true), true)
            .add("BMS_TempProbe", ArrayType(ArrayType(IntegerType) , true), true)
            .add("BMS_TotalCellNo", ArrayType(IntegerType, true), true)
            .add("BMS_TotalTempProbe", ArrayType(IntegerType, true), true)
            .add("GPS_Heading", ArrayType(IntegerType, true), true)
            .add("GPS_Latitude", ArrayType(IntegerType, true), true)
            .add("GPS_Longitude", ArrayType(IntegerType, true), true)
            .add("GPS_Speed", ArrayType(IntegerType, true), true)
            .add("ICU_AverageVehicleSpeed", ArrayType(IntegerType, true), true)
            .add("ICU_ICUTripAOdometer", ArrayType(IntegerType, true), true)
            .add("ICU_ICUTripBOdometer", ArrayType(IntegerType, true), true)
            .add("ICU_WashLiquidLevelWarn", ArrayType(IntegerType, true), true)
            .add("ICU_drive_time", ArrayType(IntegerType, true), true)
            .add("PTG_DoorOpenRatio", ArrayType(IntegerType, true), true)
            .add("TPMS_PressureValue_FL", ArrayType(IntegerType, true), true)
            .add("TPMS_PressureValue_FR", ArrayType(IntegerType, true), true)
            .add("TPMS_PressureValue_RL", ArrayType(IntegerType, true), true)
            .add("TPMS_PressureValue_RR", ArrayType(IntegerType, true), true)
            .add("TPMS_TireTempValue_FL", ArrayType(IntegerType, true), true)
            .add("TPMS_TireTempValue_FR", ArrayType(IntegerType, true), true)
            .add("TPMS_TireTempValue_RL", ArrayType(IntegerType, true), true)
            .add("TPMS_TireTempValue_RR", ArrayType(IntegerType, true), true)
            .add("VCU_AtmosPressure", ArrayType(IntegerType, true), true)
            .add("VCU_CruiseCtrTgtSpd", ArrayType(IntegerType, true), true)
            .add("VCU_MCU_Input_Curr", ArrayType(IntegerType, true), true)
            .add("VCU_MCU_Input_Vol", ArrayType(IntegerType, true), true)
            .add("VCU_MCU_Temp", ArrayType(IntegerType, true), true)
            .add("VCU_MotorActualPower", ArrayType(IntegerType, true), true)
            .add("VCU_MotorDTC_Num", ArrayType(IntegerType, true), true)
            .add("VCU_MotorDTC_list", ArrayType(ArrayType(IntegerType) , true), true)
            .add("VCU_MotorTemp", ArrayType(IntegerType, true), true)
            .add("VCU_PCUInletCooltTemp", ArrayType(IntegerType, true), true)
            .add("VCU_PwrCoolFanSpdDuty", ArrayType(IntegerType, true), true)
            .add("VCU_VacuumPressure", ArrayType(IntegerType, true), true)
            .add("VCU_VhclPwrCnsmpAvrg", ArrayType(IntegerType, true), true))
          .add("startTime", IntegerType))
    }
    
    object DcSaleData {}
    

    explode

    explode 接收的参数是 Array 或 Map,如果是 Struct, 得先转换为 Array:

    package ohmysummer
    
    import ohmysummer.pipeline.kafka.WmKafkaDeserializer
    import ohmysummer.pipeline.schema.DcSaleData
    import org.apache.spark.sql.{Dataset, SparkSession}
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.functions._
    
    /**
      * 从 Kafka 读取  JSON 数据
      *  https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
      *  https://stackoverflow.com/questions/43297973/how-to-read-records-in-json-format-from-kafka-using-structured-streaming
      *  https://stackoverflow.com/questions/48361177/spark-structured-streaming-kafka-convert-json-without-schema-infer-schema
      *  https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
      *  https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
      */
    object SparkStructuredStreaming {
      def main(args: Array[String]) {
    
        val spark = SparkSession
          .builder
          .appName("ReadFromKafka")
          .master("local[*]")
          .getOrCreate()
    
        object KafkaDeserializerWrapper {
          val deser = new WmKafkaDeserializer
    //      val deser = new WmJsonDeserializer
        }
        spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
          KafkaDeserializerWrapper.deser.deserialize(topic, bytes)
        )
    
        val df = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("subscribe", "dc-sale-data")
          .option("startingOffsets", "earliest") // 测试环境使用最早的数据
          .load()
    
        import spark.implicits._
        // 反序列化 value 中的字节数组, 得到原始 JSON
        val result: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", """deserialize("dc-sale-data", value) AS message""")
          .as[(String, String)]
    
        val schema = (new DcSaleData).schema
    
        val parsed = result.select($"key", from_json($"message", schema) as "data")
        val event = parsed.select(explode(array($"data.signal1s.info"))).select("col.*")
    
        event.printSchema()
        val console = event.writeStream
          .format("console")
          .outputMode(OutputMode.Append())
    
        val query = console.start()
    
        query.awaitTermination()
    
      }
    }
    

    如果解析出来是空的, 可能就是 schema 定义的问题,多半是字段的类型弄错了。

    参考

    相关文章

      网友评论

          本文标题:Spark Structured Streaming 解析 JS

          本文链接:https://www.haomeiwen.com/subject/jhbagftx.html