用 parquet 数据模拟实时数据流
import ohmysummer.conf.{KafkaConfiguration, UriConfiguration}
import ohmysummer.pipeline.schema.{EnterpriseSchema, NationSchema}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions.{col, struct, to_json}
/**
* 读取 parquet 文件转为 JSON 后写到 HDFS, 在用命令行将 JSON 数据逐行发到 Kakfa 模拟实时流
*/
object WriteEnterprise2Kafka {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local[2]")
.appName("Write Enterprise Parquet to Kafka")
.getOrCreate()
val parquetSchema = (new EnterpriseSchema).schema
val parqurtUri = (new UriConfiguration).xs6enterprise
val topics = (new KafkaConfiguration).topics
val bootstrap_servers = (new KafkaConfiguration).bootstrap_servers
import spark.implicits._
val ds: DataFrame = spark.readStream
.schema(parquetSchema)
.parquet(parqurtUri)
.filter(($"timestamp" isNotNull) && ($"timestamp" > 956678797000L) && ($"timestamp" < 1924876800000L) )
val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_* ) ) as "value" )
.filter($"key" isNotNull)
// 将 parquet 写为 json
val jdf = df
.writeStream
.format("json")
.option("path", "/tmp/json/nation")
.option("checkpointLocation", "/tmp/write-json2hdfs")
.start()
jdf.awaitTermination()
}
}
再将 JSON 数据逐行发到 Kafka 的不同 topic:
hdfs dfs -cat hdfs://xxxxxx/json/test.json | while read -r LINE; do echo $LINE | sed "s/\"}$/\",\"partition\":$(( ( RANDOM % 5 ) + 1 ))}/"; sleep 1; done | kt produce -topic xs6-nation-test -brokers "dn03,nn01,nn02" ; sleep 0.1; done
网友评论