美文网首页
Spark Stuctured Streaming 解析字段不固

Spark Stuctured Streaming 解析字段不固

作者: 焉知非鱼 | 来源:发表于2018-09-15 22:51 被阅读88次

    数据样例

    有两个文件,一个是 json: a.json

    {
      "createTime": 1532598069,
      "event": {
        "info": {
           "AAA": "one",
           "BBB": "two",
           "DDD": "opps"
        }
      }
    }
    

    另一个也是 json: b.json

    {
      "createTime": "1532598069",
      "event": {
        "info": {
           "AAA": "three",
           "BBB": "four",
           "CCC": "haha"
        }
      }
    }
    

    Kafka Producer

    info 里面的字段个数是不固定的。用下面的代码先将 a.json 发送到 Kafka:

    from confluent_kafka import Producer
    
    
    p = Producer({'bootstrap.servers': 'localhost:9092'})
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    with open("/Users/ohmycloud/work/notes/b.json") as f:
      data = f.read()
      p.poll(0)
      p.produce('dynamic-schema', data.encode('utf-8'), callback=delivery_report)
    
    p.flush()
    

    Kafka Consumer

    package dynamic.schma.test
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming._
    import org.apache.spark.sql.types._
    
    
    object DynamicSchema extends App {
      val spark = SparkSession
        .builder
        .appName("DynamicSchema")
        .master("local[*]")
        .getOrCreate()
    
      // 定义 schema,包含 json 中的所有可能出现的字段
      val schema = new StructType()
        .add("createTime", StringType)
        .add("event", MapType(StringType, new StructType()
          .add("AAA", StringType, true)
          .add("BBB", StringType, true)
          .add("CCC", StringType, true)
          .add("DDD", StringType, true)
        ))
    
      val parsed = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "dynamic-schema")
        .option("startingOffsets", "earliest")
        .load()
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
    
      import spark.implicits._
    
      val event = parsed.select(explode($"parsed_value.event")).select("value.*")
    
      val console = event.writeStream
        .format("console")
        .outputMode(OutputMode.Append())
    
      val query = console.start()
    
      query.awaitTermination()
    
    }
    

    打印出来的结果为:

    +---+---+----+----+
    |AAA|BBB| CCC| DDD|
    +---+---+----+----+
    |one|two|null|opps|
    +---+---+----+----+
    

    因为 a.json 里面没有 CCC 这个字段,并且 schema 里面设置允许 CCC 的值为 NULL, 所以 OK 的。

    然后发送 b.json, 打印的结果为:

    +-----+----+----+----+
    |  AAA| BBB| CCC| DDD|
    +-----+----+----+----+
    |three|four|haha|null|
    +-----+----+----+----+
    

    b.json 里面没有 DDD, schema 设置 CCC 的值允许为空,所以 NULL OK。

    验证了一下 schema 的问题。

    相关文章

      网友评论

          本文标题:Spark Stuctured Streaming 解析字段不固

          本文链接:https://www.haomeiwen.com/subject/viztnftx.html