美文网首页
用 parquet 数据模拟实时数据流

用 parquet 数据模拟实时数据流

作者: 焉知非鱼 | 来源:发表于2019-02-20 16:26 被阅读0次

    用 parquet 数据模拟实时数据流

    import ohmysummer.conf.{KafkaConfiguration, UriConfiguration}
    import ohmysummer.pipeline.schema.{EnterpriseSchema, NationSchema}
    import org.apache.spark.sql._
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.sql.functions.{col, struct, to_json}
    
    /**
      * 读取 parquet 文件转为 JSON 后写到 HDFS, 在用命令行将 JSON 数据逐行发到 Kakfa 模拟实时流
      */
    object WriteEnterprise2Kafka {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder
          .master("local[2]")
          .appName("Write Enterprise Parquet to Kafka")
          .getOrCreate()
    
        val parquetSchema = (new EnterpriseSchema).schema
        val parqurtUri = (new UriConfiguration).xs6enterprise
        val topics = (new KafkaConfiguration).topics
        val bootstrap_servers =  (new KafkaConfiguration).bootstrap_servers
    
        import spark.implicits._
        val ds: DataFrame = spark.readStream
          .schema(parquetSchema)
          .parquet(parqurtUri)
          .filter(($"timestamp" isNotNull) && ($"timestamp" > 956678797000L) && ($"timestamp" < 1924876800000L) )
    
        val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_*  ) ) as "value" )
          .filter($"key" isNotNull)
    
        // 将 parquet 写为 json
        val jdf = df
          .writeStream
          .format("json")
          .option("path", "/tmp/json/nation")
          .option("checkpointLocation", "/tmp/write-json2hdfs")
          .start()
    
        jdf.awaitTermination()
      
      }
    }
    

    再将 JSON 数据逐行发到 Kafka 的不同 topic:

    hdfs dfs -cat hdfs://xxxxxx/json/test.json | while read -r LINE; do echo $LINE | sed "s/\"}$/\",\"partition\":$(( ( RANDOM % 5 )  + 1 ))}/"; sleep 1; done  | kt produce -topic xs6-nation-test -brokers "dn03,nn01,nn02" ; sleep 0.1; done
    

    相关文章

      网友评论

          本文标题:用 parquet 数据模拟实时数据流

          本文链接:https://www.haomeiwen.com/subject/zubhyqtx.html