美文网首页大数据
一篇文章学会spark-streaming

一篇文章学会spark-streaming

作者: bigdataer | 来源:发表于2017-03-10 18:16 被阅读193次

    版权申明:转载请注明出处。
    文章来源:http://bigdataer.net

    1.什么是spark-streaming?

    实际生产中会有许多应用到实时处理的场景,比如:实时监测页面点击,实时监测系统异常,实时监测来自于外部的攻击。针对这些场景,twitter研发了实时数据处理工具storm,并在后来开源。spark针对这些场景设计了spark-streaming实时计算模型,它允许用户使用一系列批处理的API去处理实时数据,能做到代码逻辑的重复使用。
    和spark中的rdd非常相似,spark-streaming中使用离散化流(discretized stream)作为抽象的表示,叫做DStream。它是随时间推移而收集数据的序列,每个时间段收集到的数据在DStream内部以一个RDD的形式存在。DStream支持从kafka,flume,hdfs,s3等获取输入。DStream也支持两种操作,即转化操作和输出操作(区别于RDD中的行动操作)。转化操作又分为无状态的转化操作和有状态的转化操作,无状态的转化操作有map,filter,flatmap,repartition等,是针对单个时间区间内的操作。而有状态的转化操作可以针对不同的时间区间,后面详述。

    2.两个简单的例子

    2.1 监听socket获取数据,代码如下:
    这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息

    object SocketStream {
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
        //接收消息
        val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    2.2 从kafka读取数据,比较常用

    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
        //监测关键字error,出现则print
        dstream.filter(_.contains("error")).foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    3.再来谈架构

    通过上面两个例子,你可能对spark-streaming有了初步的了解,我们再来看一下它的架构。
    Spark-streaming使用"微批次"的架构,把流式计算当做一系列微型的批处理操作来对待,每个时间段都产生一个RDD。如图:


    wpc

    作用于一个DStream上的无状态转化操作会对它其中的每个RDD生效,如针对一个输入为语句的DStream做flatMap操作的示意图如下:


    shiyitu

    4.转化操作

    4.1 无状态的转化操作。
    无状态转化操作就是简单的将转化作用于DStream的每个RDD上面。下面列举了一些常见的转化操作,其中最后一个transform表示可以试用自定义的转化函数,尽管它前面已经提供了很多现成的API。


    wzt

    4.2有状态的转化操作。
    有状态的转化操作是跨时间段的数据操作,一些先前的批次也被用来在新的批次中做计算。主要有滑动窗口和updateStateByKey。前者以一个时间段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化。有状态的转化操作需要打开检查点机制来保证容错性。即:给ssc.checkpoint()设置一个检查点目录。
    (1)基于窗口的转化操作会在一个比ssc设置的更长的时间段内,通过整合多个批次的,计算出整个大的时间窗口的结果。基于窗口的操作需要两个参数,一个是窗口时长,一个是滑动步长。这两个参数是ssc设置的时长的整数倍。下面的图表示了一个时间窗口为3,滑动步长为2的窗口转化操作。


    window
    前面提到的监测关键字error的例子,现在需要每隔20s就对前面30s有error的日志记录做计数,代码如下:
    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER)
          .map(_._2)
        //每隔20s对前30s出现error的日志做计数
        val errors = dstream.window(Seconds(30),Seconds(20))
            .filter(_.contains("error"))
            .count()
        errors.foreachRDD(rdd=>{
          rdd.foreach(println(_))
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    (2)updateStateByKey
    updateStateByKey能对键值对的数据进行不同批次间的数据计算,使用updateStateByKey,需要传入一个update函数,这个函数接收某个key最新批次对应的values,以及该key之前对应的value,按照自定义的逻辑返回一个新的value。如需要计算一个实时日志中http响应码的计数,代码如下:

    object KafkaStream {
    
      def main(args: Array[String]): Unit = {
        //输出目录
        val output = args(0)
        //本地测试,设置4核
        val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
        //以10秒为一个批次
        val ssc = new StreamingContext(conf,Seconds(10))
    
        val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
        val group_id = "realtime_data"
    
        //kafka相关参数
        val kafka_param = Map[String,String](
          "zookeeper.connect" ->zkQuorum,
          "group.id" -> group_id,
          "zookeeper.connection.timeout.ms" -> "10000",
          "fetch.message.max.bytes" -> "10485760"
        )
        val topic = Map[String,Int]("test_topic" -> 16)
        //接收消息
        val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
        val rdd = dstream.map(_.split("\001"))
          .map(x=>(x(0),x(1).toLong))
          .updateStateByKey(update)
        //输出
        rdd.foreachRDD(_.saveAsTextFile(output))
        ssc.start()
        ssc.awaitTermination()
      }
      //update函数
      def update(new_values:Seq[Long],old_value:Option[Long]):Option[Long]={
        val current_num = new_values.size
        val result_num = current_num + old_value.getOrElse(0L)
        Some(result_num)
      }
    }
    

    (3)所有有状态转化操作


    state

    5.输出操作

    输出操作比较简单,有以下几种:


    save

    6.作业稳定性

    spark-streaming作业一般都要全天候不间断运行,那么作业的稳定性如何保证?主要有以下几点:
    6.1 检查点机制。
    其原理就是阶段性的将作业运行的数据存放到存储系统,如hdfs,s3等。当作业运行出现异常时可以从上述数据中恢复。
    6.2 驱动器容错。
    在创建实时计算作业的上下文时使用getOrCreate函数。代码如下:

        val ssc = StreamingContext.getOrCreate(cp_dir,createContext )
        def createContext(): StreamingContext  ={
          val sc = new SparkContext(conf)
          val ssc = new StreamingContext(sc,Seconds(10))
          ssc.checkpoint(cp_dir)
        }
    

    更多文章请关注微信公众号:bigdataer

    wx

    相关文章

      网友评论

        本文标题:一篇文章学会spark-streaming

        本文链接:https://www.haomeiwen.com/subject/spdogttx.html