美文网首页
PySpark-数据操作-DStream

PySpark-数据操作-DStream

作者: NEO_X | 来源:发表于2019-07-07 20:49 被阅读0次

    内容摘入自<<Python大数据分析从入门到精通>>

    附书源码下载地址

    更多信息https://blue-shadow.top/

    Spark DStream

    作为Apache Spark API的扩展,Spark Streaming是容错的高吞吐量系统。它处理实时数据流。Spark流取得输入来自各种输入可靠来源,如flume,HDFS,和kafka等,并且然后将已处理的数据发送到文件系统,数据库或实时仪表盘。输入数据流被分成批量数据,然后批量生成结果的最终流。

    Spark DStream(Discretized Stream)是Spark Streaming的基本抽象。DStream是一个连续的数据流。它接收来自各种来源的输入,如Kafka,Flume,Kinesis或TCP套接字。它也可以是通过转换输入流生成的数据流。DStream的核心是连续的RDD(Spark抽象)流。DStream中的每个RDD都包含来自特定间隔的数据。

    DStream上的任何操作都适用于所有底层RDD。DStream涵盖了所有细节。它为开发人员提供了一个高级API,以方便使用。因此,Spark DStream便于处理流数据

    Spark Streaming为DStream 提供了与RDD相同的容错属性。只要输入数据的副本可用,它就可以使用RDD的谱系从它重新计算任何状态。默认情况下,Spark会复制两个节点上的数据。因此,Spark Streaming可以承受单个工作者故障

    dstream.png

    Spark DStream操作

    • transformation操作
      DStream中有两种类型的转换: 无状态转换 , 有状态的转变

      • 无状态转换
        每批的处理不依赖于先前批次的数据。无状态转换是简单的RDD转换。它适用于每个批处理,意味着DStream中的每个RDD。它包括常见的RDD转换,如map(),filter(),reduceByKey()等。虽然这些函数看起来像应用于整个流,但每个DStream都是许多RDD(批处理)的集合。因此,每个无状态转换都适用于每个RDD

      • 有状态的转变
        它使用先前批次的数据或中间结果,并计算当前批次的结果。状态转换是对DStream的操作,可以跨时间跟踪数据。因此,它利用先前批次中的一些数据来生成新批次的结果,两种主要类型是窗口操作,其作用于时间段的滑动窗口,以及updateStateByKey(),其用于跟踪每个键的事件的状态(例如,构建表示每个用户会话的对象)

    • Output操作

      一旦我们在转换后获得数据,就会在Spark Streaming中执行该数据输出操作。在我们程序的调试之后,使用输出操作我们只能保存输出。一些输出操作是print(),save()等。保存操作将目录保存到文件中,并带有可选的后缀

    代码举例

    # 每隔一段时间生成一个文件,模拟数据文件流的生成,用于DStream监控对应的文件夹,进行文件处理
    import time
    import datetime
    
    def generate_file():
        t = time.strftime('%Y-%m-%d',time.localtime())
        newfile = t + '.txt' 
        f = open(newfile,'w')
        f.write(newfile) 
        f.close()
    
    if __name__ == '__main__':
        generate_file()
    
    # 使用DStream进行流处理
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.streaming import StreamingContext
    
    def read_file_stream():
        sc = SparkContext.getOrCreate()
        ssc = StreamingContext(sc, 1)
    
        stream_data = ssc.textFileStream("D:\Developing\data")
        stream_data.pprint()
        ssc.start()
        ssc.awaitTermination()
    
    def save_stream_rdd():
        sc = SparkContext.getOrCreate()
        spark = SparkSession(sc)
        ssc = StreamingContext(sc, 1)
        stream_data = ssc.textFileStream("D:\Developing\data")   
        value = stream_data.countByValue()
        ssc.start()
        ssc.awaitTermination()
    
    if __name__=="__main__":
        read_file_stream()
        #save_stream_rdd()
    
    dstream_test.png

    上一篇:pyspark-数据操作-rdd
    下一篇:pyspark-数据操作-dstream

    相关文章

      网友评论

          本文标题:PySpark-数据操作-DStream

          本文链接:https://www.haomeiwen.com/subject/cijehctx.html