美文网首页
Spark Streaming 1.基本操作

Spark Streaming 1.基本操作

作者: caster | 来源:发表于2021-06-08 09:14 被阅读0次

    1. 数据分析分类:

    流式数据处理:多条数据缓冲一起处理
    批量数据处理:一条数据一处理

    实时数据处理:数据处理延迟时间毫秒
    离线数据处理:数据处理延迟时间小时/天

    2. 微批

    Spark Streaming准实时(秒/分钟),微批流(控制时间采集周期)处理方式。
    Spark Streaming将数据抽象为离散化流DStream,每个时间区间内的数据都作为RDD。DStream是对RDD的封装,形成RDD序列。

    3. 背压机制

    调整接收数据速率适配处理和输出数据的能力。

    4. 简单Demo(无状态)

    此种方式,每次采集周期结果不会累加,不会保存上次记录,即无状态的。

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    //采集周期:3s一次
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    
    //监听本地9999端口
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    
    val words = lines.flatMap(_.split(" "))
    
    val wordToOne = words.map((_,1))
    
    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_+_)
    
    wordToCount.print()
    
    // 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
    // 如果main方法执行完毕,应用程序也会自动结束。所以不能让main执行完毕
    //ssc.stop()
    // 1. 启动采集器
    ssc.start()
    // 2. 等待采集器的关闭
    ssc.awaitTermination()
    

    5. DStream

    DSstream构造方式:

    1. 继承ReceiverT
    2. 实现onStart()和onStop()方法

    6. DStream保存采集周期结果(有状态流)

    通过updateStateByKey更新之前周期中统计的结果;
    每个周期的统计结果存在checkpoint文件中

    val sparkConf = new SparkConf().setMaster("local").setAppName("SparkStreaming")
    
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //缓冲区文件的存储目录
    ssc.checkpoint("cp")
    
    // 无状态数据操作,只对当前的采集周期内的数据进行处理
    // 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
    // 使用有状态操作时,需要设定检查点路径
    val datas = ssc.socketTextStream("localhost", 9999)
    
    val wordToOne = datas.map((_,1))
    
    //val wordToCount = wordToOne.reduceByKey(_+_)
    
    // updateStateByKey:根据key对数据的状态进行更新
    // 传递的参数中含有两个值
    // 第一个值表示相同的key的value数据
    // 第二个值表示缓存区相同key的value数据
    val state = wordToOne.updateStateByKey(
        ( seq:Seq[Int], buff:Option[Int] ) => {
            val newCount = buff.getOrElse(0) + seq.sum
            Option(newCount)
        }
    )
    
    state.print()
    
    ssc.start()
    ssc.awaitTermination()
    

    7. DStream提供的方法

    1. transform():获取rdd操作后返回rdd,与map()区别:
    
    // transform方法可以将底层RDD获取到后进行操作
    // 1. DStream功能不完善
    // 2. 需要代码周期性的执行
    
    // 执行位置 : Driver端
    val newDS: DStream[String] = lines.transform(
        rdd => {
            // 执行位置 : Driver端,(周期性执行)
            rdd.map(
                str => {
                    // Code : Executor端
                    str
                }
            )
        }
    )
    // 执行位置 : Driver端
    val newDS1: DStream[String] = lines.map(
        data => {
            // 执行位置 : Executor端
            data
        }
    )
    
    1. join():两个流join,底层为两个RDD join,每个周期内的数据进行join,不会保留状态
    val joinDS = ds1.join(ds2)
    
    1. window():
      窗口时长:每次计算的时间范围(采集周期倍数)
      滑动步长:多久计算一次(每次滑动计算一次,默认每个采集周期计算一次)
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    
    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))
    
    // 窗口的范围长度应该是采集周期的整数倍
    // 窗口可以滑动的,但是默认情况下,一个采集周期进行滑动,可能会出现重复数据的计算
    val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6))
    // 为了避免重复数据,可以改变滑动的滑动(步长)
    val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))
    
    
    val wordToCount = windowDS.reduceByKey(_+_)
    wordToCount.print()
    
    ssc.start()
    ssc.awaitTermination()
    

    countBywindow():统计窗口内元素数量
    reduceByKeyAndWindow():当窗口范围比较大,但是滑动幅度比较小,可以采用增加数据和删除数据的方式,防止重复计算,提升性能。
    ......

    8. DStream输出操作

    惰性求值,需要行动算子触发(print等)
    foreachRDD():底层rdd操作

    9. 关闭Spark Stream程序:

    ssc.start()
    
    // 如果想要关闭采集器,那么需要创建新的线程
    // 而且需要在第三方程序中增加关闭状态
    new Thread(
        new Runnable {
            override def run(): Unit = {
                while ( true ) {
                    if (SomeThing) {//通过外部系统判定是否需要关闭
                        // 获取SparkStreaming状态
                        val state: StreamingContextState = ssc.getState()
                        if ( state == StreamingContextState.ACTIVE ) {
                            ssc.stop(true, true)//延迟关闭
                        }
                        System.exit(0)
                    }
                    Thread.sleep(5000)
                }
                
            }
        }
    ).start()
    
    ssc.awaitTermination() // block 阻塞main线程
    

    10. 关闭后恢复Spark Stream

    从之前的checkpoint恢复数据或者重新创建:

    val ssc = StreamingContext.getActiveOrCreate("cp", ()=>{
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        val ssc = new StreamingContext(sparkConf, Seconds(3))
    
        val lines = ssc.socketTextStream("localhost", 9999)
        val wordToOne = lines.map((_,1))
    
        wordToOne.print()
    
        ssc
    })
    
    ssc.checkpoint("cp")
    

    相关文章

      网友评论

          本文标题:Spark Streaming 1.基本操作

          本文链接:https://www.haomeiwen.com/subject/ihtleltx.html