我们知道sparkstreaming官方已经停止了维护,从spark2.2开始全力打造Structured Streaming,下面我们来介绍Structured Streaming如何读取kafka中的数据。
Structured Streaming读取数据分为批处理和流处理:
package com.ky.service
import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.{Dataset, SparkSession}
/**
- @Author: xwj
- @Date: 2019/1/31 0031 13:48
- @Version 1.0
*/
object KafkaStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel(LogLevel.ERROR.getLabel)
import spark.implicits._
val topic = "kafka"
val df = spark
//read是批量读取,readStream是流读取,write是批量写,writeStream是流写入 关于startingoffsets "latest" for streaming, "earliest" for batch
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
.option("subscribe", topic) //topic可以订阅多个,消费具体分区用assign,消费topic用subscribe
// .option("startingoffsets", "earliest") 读具体偏移量,只支持批读取
// .option("endingoffsets", "latest")
.load()
val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
//判断是否为流处理
println(kafkaDf.isStreaming)
kafkaDf.printSchema()
val words = kafkaDf.flatMap(_._2.split(","))
val wordCounts = words.groupBy("value").count()
val query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
结合sparksql的应用:
object KafkaStreaming2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel(LogLevel.ERROR.toString)
val topic = "kafka"
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
.option("subscribe", topic)
.load()
val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val value = kafkaDf.filter(_._2.split(",").length == 3)
val deviceDf: Dataset[DeviceData] = value.map(line => {
val arr = line._2.split(",")
DeviceData(arr(0), arr(1), arr(2).toDouble)
})
deviceDf.createOrReplaceTempView("test")
val frame = spark.sql("select * from test").where("signal>0.5")
//outputMode("complete")不可加
val query = frame.writeStream.format("console").start()
query.awaitTermination()
}
}
case class DeviceData(device: String, deviceType: String, signal: Double)
和传统方式进行对比:
object Test {
def main(args: Array[String]): Unit = {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-2")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val personRDD = sc.textFile(args(0)).map(_.split(" "))
//通过StructType直接指定每个字段的schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//注册表
personDataFrame.createOrReplaceTempView("t_person")
//执行SQL
val df = sqlContext.sql("select * from t_person order by age desc limit 4")
//将结果以JSON的方式存储到指定位置
df.write.json(args(1))
//停止Spark Context
sc.stop()
}
}
网友评论