美文网首页
structured-streaming

structured-streaming

作者: 6cc89d7ec09f | 来源:发表于2018-03-18 19:02 被阅读200次

    编程模型

    结构化数据流中的关键思想是将实时数据流视为一个不断附加的表。这导致新的流处理模型与批处理模型非常相似。您将把流式计算表示为标准批量查询,就像在静态表上一样,Spark将它作为增量查询在无界输入表上运行。让我们更详细地了解这个模型。


    与kafka的集成

    1、参考文档

    http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

    2、kafka的版本

    Kafka broker version 0.10.0 or higher

    3、示例1 在ide上运行
    groupId = org.apache.spark
    artifactId = spark-sql-kafka-0-10_2.11
    version = 2.2.0
    ################编写代码在ide上启动##################
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .master("local[2]")
          .appName("StructuredStreamingKafka")
          .getOrCreate()
    
        import spark.implicits._
        val df = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers","bigdata-pro02.kfk.com:9092")
          .option("subscribe","weblog")
          .load()
        val lines = df.selectExpr("CAST(value as STRING)")//对字段进行UDF操作,并返回该列
          .as[String]
    
        val wordCount = lines.flatMap(_.split(" ")).groupBy("value").count()
    
        //开启
        val query = wordCount.writeStream
          .outputMode("complete") //模式,complete,updata,
          .format("console")  //输出的地方在控制台
            .start()
    
        query.awaitTermination()
    
      }
    ###############启动指定节点上的kafka和消息生产者##################
    bin/kafka-server-start.sh config/server.properties
    bin/kafka-topics.sh --create --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --replication-factor 3 --partitions 1 --topic weblog
    
    3、示例2 在spark-shell上运行
    jars上需要导包
    kafka_2.11-0.10.0.0.jar
    kafka-clients-0.10.0.0.jar
    spark-sql-kafka-0-10_2.11-2.2.0.jar
    spark-streaming-kafka-0-10_2.11-2.1.0.jar
    #####################启动spark-shell#################
    bin/spark-shell
    :paste
    import spark.implicits._
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers","bigdata-pro02.kfk.com:9092")
      .option("subscribe","weblog")
      .load()
    val lines = df.selectExpr("CAST(value as STRING)")
      .as[String]
    val wordCount = lines.flatMap(_.split(" ")).groupBy("value").count()
    val query = wordCount.writeStream
      .outputMode("complete") 
      .format("console")  
        .start()
    query.awaitTermination()
    

    与mysql集成——输出到mysql中

    spark2.2.0暂没有api直接输出道mysql中,但是可以利用重写ForeachWriter的方法,将每一行数据写入到mysql中。如果数据量非常大,建议先写到kafka中存储,kafka按照队列的排序进行写入到mysql中
    jdbcSink类

    package toMysql
    
    import java.sql._
    
    import org.apache.spark.sql.{ForeachWriter, Row}
    
    /**
      * Created by zhongyuan on 2018/3/18.
      */
    class jdbcSink(url:String,user:String,pwd:String) extends ForeachWriter[Row]{
      val driver = "com.mysql.jdbc.Driver";
      var statement:Statement = _;
      var connection:Connection  = _;
      //创建连接
       def open(partitionId: Long, version: Long): Boolean = {
         Class.forName(driver);
         connection = DriverManager.getConnection(url,user,pwd);
         this.statement = connection.createStatement();
    
         true;
       }
      //执行sql
      override def process(value: Row): Unit = {
        statement.executeUpdate("insert into wordcount values('"+value.getAs("value")+"',"+value.getAs("count")+")")
      }
      //关闭资源
      override def close(errorOrNull: Throwable): Unit = {
        connection.close()
      }
    }
    

    主函数 StructuredStreamingKafkaMysql

    package toMysql
    
    import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
    import org.apache.spark.sql.streaming.ProcessingTime
    
    /**
      * Created by zhongyuan on 2018/3/18.
      */
    object StructuredStreamingKafkaMysql {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .master("local[2]")
          .appName("StructuredStreamingKafkaMysql")
          .getOrCreate()
    
        import spark.implicits._
        val df = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers","bigdata-pro02.kfk.com:9092")
          .option("subscribe","weblog")
          .load()
        val lines = df.selectExpr("CAST(value as STRING)")//对字段进行UDF操作,并返回该列
          .as[String]
    
        val wordCount = lines.flatMap(_.split(" ")).groupBy("value").count()
    
        //输出到外部mysql
        val url = "jdbc:mysql://bigdata-pro03.kfk.com/spark"
        val user  = "root"
        val pwd = "123456"
        val writer:ForeachWriter[Row] = new jdbcSink(url,user,pwd);//新建自定义类
        val query = wordCount
          .writeStream
          .foreach(writer)//forEach()里只能写ForeachWriter[Row]类,所以需要指定writer的类型
          .outputMode("update")
          .trigger(ProcessingTime("25 seconds"))
          .start()
        query.awaitTermination()
      }
    
    }
    

    执行顺序
    先启动指定所有节点的zookeeper
    在启动指定节点的kafka
    启动指定节点的topic为weblog的消息生产者
    启动指定节点的mysql
    启动ide程序
    利用消息producer来发送消息
    查询mysql中是否有数据

    相关文章

      网友评论

          本文标题:structured-streaming

          本文链接:https://www.haomeiwen.com/subject/wqceqftx.html