美文网首页
Structured Streaming实时读取kafka数据案

Structured Streaming实时读取kafka数据案

作者: 会飞的蜗牛66666 | 来源:发表于2019-01-31 14:46 被阅读0次

    我们知道sparkstreaming官方已经停止了维护,从spark2.2开始全力打造Structured Streaming,下面我们来介绍Structured Streaming如何读取kafka中的数据。
    Structured Streaming读取数据分为批处理和流处理:
    package com.ky.service

    import org.apache.log4j.lf5.LogLevel
    import org.apache.spark.sql.{Dataset, SparkSession}

    /**

    • @Author: xwj
    • @Date: 2019/1/31 0031 13:48
    • @Version 1.0
      */
      object KafkaStreaming {

    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel(LogLevel.ERROR.getLabel)

    import spark.implicits._
    val topic = "kafka"
    val df = spark
      //read是批量读取,readStream是流读取,write是批量写,writeStream是流写入 关于startingoffsets "latest" for streaming, "earliest" for batch
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
      .option("subscribe", topic) //topic可以订阅多个,消费具体分区用assign,消费topic用subscribe
      //      .option("startingoffsets", "earliest") 读具体偏移量,只支持批读取
      //      .option("endingoffsets", "latest")
      .load()
    val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
    //判断是否为流处理
    println(kafkaDf.isStreaming)
    kafkaDf.printSchema()
    val words = kafkaDf.flatMap(_._2.split(","))
    val wordCounts = words.groupBy("value").count()
    val query = wordCounts
      .writeStream
      .outputMode("complete")
      .format("console")
      .start()
    query.awaitTermination()
    

    }
    }

    结合sparksql的应用:
    object KafkaStreaming2 {

    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel(LogLevel.ERROR.toString)
    val topic = "kafka"
    import spark.implicits._
    val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
    .option("subscribe", topic)
    .load()
    val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
    val value = kafkaDf.filter(_._2.split(",").length == 3)
    val deviceDf: Dataset[DeviceData] = value.map(line => {
    val arr = line._2.split(",")
    DeviceData(arr(0), arr(1), arr(2).toDouble)
    })
    deviceDf.createOrReplaceTempView("test")
    val frame = spark.sql("select * from test").where("signal>0.5")
    //outputMode("complete")不可加
    val query = frame.writeStream.format("console").start()
    query.awaitTermination()
    }

    }

    case class DeviceData(device: String, deviceType: String, signal: Double)

    和传统方式进行对比:
    object Test {

    def main(args: Array[String]): Unit = {
    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)
    //从指定的地址创建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
    val schema = StructType(
    List(
    StructField("id", IntegerType, true),
    StructField("name", StringType, true),
    StructField("age", IntegerType, true)
    )
    )
    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //注册表
    personDataFrame.createOrReplaceTempView("t_person")
    //执行SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
    }

    }

    相关文章

      网友评论

          本文标题:Structured Streaming实时读取kafka数据案

          本文链接:https://www.haomeiwen.com/subject/kugksqtx.html