Spark Streaming进阶

作者: 董二弯 | 来源:发表于2019-05-22 17:36 被阅读3次

    在前面Spark Streaming入门的基础上继续深入学习Spark Streaming

    StreamingContext

    初始化一个Spark Streaming程序时必须要创建StreamingContext作为程序的入口。
    一旦StreamingContext定义好以后,就可以做如下的事情

    • 定义输入源通过创建输入DStreams
    • 定义流的操作使用transformation输出操作到Dsteams
    • 开始接收数据和进行启动streamingContext.start()
    • 等待进程的停止streamingContext.awaitTermination()或者手动停止streamingContext.stop().
      注意事项:
    • StreamingContext启动后,新的流计算将部能被添加设置
    • StreamingContext停止在后,不能重启,可以把整个作业停掉,在重启。
    • 只有一个StreamingContext被激活在JVM同一个时间点

    输入DStream和Receiver

    输入DStream(InputDStream/ReceiverInputDStream)

    输入DStream代表了来自数据源的输入数据流。在之前的wordcount例子中,lines就是一个输入DStream(JavaReceiverInputDStream),代表了从netcat(nc)服务接收到的数据流。输入DStream分为InputDStream和ReceiverInputDStream两种,其中文件数据流(FileInputDStream)即是一个InputDStream,它监听本地或者HDFS上的新文件,然后生成RDD,其它输入DStream为ReceiverInputDStream类型,都会绑定一个Receiver对象。输入DStream是一个关键的组件,用来从数据源接收数据,并将其存储在Spark的内存中,以供后续处理。
    Spark Streaming提供了两种内置的数据源支持;

    • 基础数据源:StreamingContext API中直接提供了对这些数据源的支持,比如文件、socket、Akka Actor等。
    • 高级数据源:诸如Kafka、Flume、Kinesis、Twitter等数据源,通过第三方工具类提供支持。这些数据源的使用,需要引用其依赖。
    • 自定义数据源:我们可以自己定义数据源,来决定如何接受和存储数据。

    Receiver

    ReceiverInputDStream类型的输入流,都会绑定一个Receiver对象。整体流程如下


    image.png

    由于Receiver独占一个cpu core,所以ReceiverInputDStream类型的作业在本地启动时绝对不能用local或者local[1],因为那样的话,只会给执行输入DStream的executor分配一个线程。而Spark Streaming底层的原理是,至少要有两条线程,一条线程用来分配给Receiver接收数据,一条线程用来处理接收到的数据。正确的做法时local[n],n>Receiver的数量。

    Transformations on DStreams

    Transformed DStream 是由其他DStream 通过非Output算子装换而来的DStream
    例如例子中的lines通过flatMap算子转换生成了FlatMappedDStream:

        val words = lines.flatMap(_.split(" "))
    

    其他的方法这里就不写了,不会的可以看看官网

    Transformation其实和rdd里面的没有什么区别,多了下面两个:


    image.png

    在后面实战中通过代码详细讲解。

    Output Operations on DStreams

    输出操作允许将DStream的数据推送到外部系统,如数据库或文件系统。 由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有DStream转换的实际执行(这里的Transform Operation也就是RDD中的action。)。 这里有一个不同的:


    image.png

    案列

    UpdateStateByKey算子的使用

    这个算子的意思就是:统计你的streaming启动到现在为止的信息。
    回顾入门课程中的wordcount案列,我们若第一次输入 a b c。经过处理后输入[a:1,b:1,c:1],第二次在输入a b c,同样输出[a:1,b:1,c:1]。那么怎么样实现累计加,输出[a:2,b:2,c:2]呢?此时就需要UpdateStateByKey来解决,如何使用?下面两步走

    • 定义状态 :状态可以是任意数据类型。
    • 定义状态更新函数 :使用函数指定如何使之前的状态更新为现在的状态。
      代码如下
    object UpdateStateByKey {
      def main(args: Array[String]): Unit = {
        //创建SparkConf
        val conf=new SparkConf().setAppName("UpdateStateByKey").setMaster("local[2]")
        //通过conf 得到StreamingContext,底层就是创建了一个SparkContext
        val ssc=new StreamingContext(conf,Seconds(10))
    //启用checkpoint(用户存储中间数据),需要设置一个支持容错 的、可靠的文件系统(如 HDFS、s3 等)目录来保存 checkpoint 数据,
    ssc.checkpoint("/root/data/sparkStreaming_UpdateStateByKey_out")
        //通过socketTextStream创建一个DSteam
        val DStream=ssc.socketTextStream("192.168.30.130",9999)
    
        DStream.flatMap(_.split(",")).map((_,1))
          .updateStateByKey(updateFunction).print()
    
        ssc.start()  // 一定要写
        ssc.awaitTermination()
      }
    
      def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
        val curr = currentValues.sum
        val pre = preValues.getOrElse(0)
        Some(curr + pre)
      }
    }
    

    这里的updateFunction方法就是需要我们自己去实现的状态跟新的逻辑,currValues就是当前批次的所有值,preValue是历史维护的状态,updateStateByKey返回的是包含历史所有状态信息的DStream。

    通过socket监听一个端口收集数据,存储到mysql中

    数据库建表语句

    create table wc(
    word char(10),
    count int
    );
    

    程序代码

    object ForEachRDD {
        def main(args: Array[String]): Unit = {
          val conf=new SparkConf().setAppName("ForEacheRDD").setMaster("local[2]")
          val ssc=new StreamingContext(conf,Seconds(10))
    
          val DStream=ssc.socketTextStream("192.168.30.130",9999)
          //wc
          val result=DStream.flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey(_+_)
    
          //把结果写入到mysql
          //foreachRDD把函数作用在每个rdd上
          dstream.foreachRDD { rdd =>
               rdd.foreachPartition { partitionOfRecords =>
              // ConnectionPool is a static, lazily initialized pool of connections
              val con=getConnection()
              partitionOfRecords.foreach(record => {
                 val word=record ._1
                 val count=record ._2.toInt
                //sql
                val sql=s"insert into wc values('$word',$count)"
                //插入数据
                val pstmt=con.prepareStatement(sql)
                pstmt.executeUpdate()
                //关闭
               pstmt.close()
                con.close()
            })
        }
      }
          ssc.start()
          ssc.awaitTermination()
      }
        def getConnection(): Connection={
          //加载驱动
          Class.forName("com.mysql.jdbc.Driver")
          //准备参数
          val url="jdbc:mysql://localhost:3306/spark"
          val username="root"
          val password="root"
          val con=DriverManager.getConnection(url,username,password)
           con
        }
    }
    

    可以把数据库插入部分替换为

         result.foreachRDD(rdd=>{
            rdd.foreach(x=>{
              val con=getConnection()
              val word=x._1
              val count=x._2.toInt
              //sql
              val sql=s"insert into wc values('$word',$count)"
              //插入数据
              val pstmt=con.prepareStatement(sql)
              pstmt.executeUpdate()
              //关闭
              pstmt.close()
              con.close()
            })
          })
    

    这么做程序虽然也能执行成功,但针对的是每一个rdd都创建一个数据库连接,非常的消耗资源。用foreachPartition的好处是每一个分区创建一个连接,性能大大提升。
    此时会有疑问,把获取数据库连接的代码 放在rdd.foreach之前不久可以解决多次获取连接的问题了吗?这里绝对不能放在外面,因为放在外面会报序列化错误


    image.png

    原因是放在外面的代码执行在 执行在driver端,数据库插入操作执行在worker端。这就涉及到了跨网络传输,肯定会出现序列化的问题。
    程序优化:
    这里把数据库连接放在连接池中更佳。

    transform实现黑名单

    上面提到transform的含义就是DStream和RDD之间的转换。
    我们以模拟黑名单为例对数据进行过滤

    object Transform {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("Transform").setMaster("local[2]")
        val ssc=new StreamingContext(conf,Seconds(10))
    
        //黑名单
        val black=Array(
          "laowang",
          "lisi"
        )
    
        //读取数据生成rdd,方便rdd的join
        val blackRDD=ssc.sparkContext.parallelize(black).map(x=>(x,true))
    
        //输入数据
        //"1,zhangsan,20","2,lisi,30", "3,wangwu,40", "4,laowang,50"
        val DStream=ssc.socketTextStream("192.168.30.130",9999)
        val output=DStream.map(x=>(x.split(",")(1),x)).transform(rdd=>{
              rdd.leftOuterJoin(blackRDD).
                filter(x=>x._2._2.getOrElse(false)!=true).map(x=>x._2._1)
        })
        //输出
        output.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    窗口函数的使用

    Spark Streaming还提供了窗口化计算,这些计算允许您在滑动的数据窗口上应用变换,主要用于每隔一个时间段计算一个时间段数据这种场景。 那到底什么时滑动窗口呢,我看先看一幅图


    image.png

    如图所示,每当窗口在源DStream上滑动时,该窗口内的RDD被组合,每一次对三个time(自己设置的)进行计算,间隔两个time进行一次计算。所以要设置两个参数:

    • 窗口长度 - 窗口的持续时间(图中的小框框)。
    • 滑动间隔 - 执行窗口操作的时间间隔(图中每个框的间隔时间)。
      如图所示就可能出现一个问题,设置的间隔太短就可能出现重复计算的可能,或者某些数据没有计算,这些也是很正常的。

    需求:每隔3s计算过去5s字符出现的次数
    此时把窗口长度设置为5,滑动间隔设置为3即可。

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by grace on 2018/6/7.
      */
    object WindowOperations {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("WindowOperations").setMaster("local[2]")
        val ssc=new StreamingContext(conf,Seconds(1))
    
        val DStream=ssc.socketTextStream("192.168.30.130",9999)
    
        //wc
     DStream.flatMap(_.split(",")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(5),Seconds(3)).print
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    整合spark sql进行操作

    我们可以使用DataFrame和SQL来操作流式数据,但是你必须使用StreamingContext正在使用的SparkContext来创建SparkSession,如果driver出现了故障,只有这样才能重新启动。

    object DataFrameAndSQLOperations {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("DataFrameAndSQLOperations").setMaster("local[2]")
        val ssc=new StreamingContext(conf,Seconds(10))
        val DStream=ssc.socketTextStream("192.168.30.130",9999)
        val result=DStream.flatMap(_.split(","))
        result.foreachRDD(rdd=>{
          val spark =SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
          import spark.implicits._
          // Convert RDD[String] to DataFrame
          val wordsDataFrame = rdd.toDF("word")
    
          // Create a temporary view
          wordsDataFrame.createOrReplaceTempView("words")
    
          // Do word count on DataFrame using SQL and print it
          val wordCountsDataFrame =
            spark.sql("select word, count(*) as total from words group by word")
          wordCountsDataFrame.show()
    
        })
    
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    相关文章

      网友评论

        本文标题:Spark Streaming进阶

        本文链接:https://www.haomeiwen.com/subject/ydsyzqtx.html