美文网首页Big Data
Spark Streaming概述

Spark Streaming概述

作者: 盗梦者_56f2 | 来源:发表于2018-08-06 18:16 被阅读13次

    简介

    Spark Streaming 是 Spark Core API 的扩展,它支持弹性的,,高吞吐的,容错的实时数据流的处理。数据可以通过多种数据源获取, 例如 Kafka,Flume,Kinesis,HDFS,s3,Twitter 以及 TCP sockets,也可以通过例如 map,reduce,join,window 等的高级函数组成的复杂算法处理。最终,处理后的数据可以输出到文件系统(HDFS),数据库(database)以及实时仪表盘中。
    在内部,它工作原理如下,Spark Streaming 接收实时输入数据流并将数据切分成多个 batch(批)数据,然后由 Spark 引擎处理它们以生成最终的 分批流结果。
    Spark Streaming 提供了一个名为 DStream的高级抽象,它代表一个连续的数据流。DStream 可以从数据源的输入数据流创建,例如 Kafka,Flume 以及 Kinesis,或者在其他 DStream 上进行高层次的操作以创建。在内部,一个 DStream 是通过一系列的 RDDs来表示。

    创建StreamingContext

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._ 
    val conf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))
    #或者如下
    val sc = ...                // 已存在的 SparkContext
    val ssc = new StreamingContext(sc, Seconds(1))
    #socket
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start() 
    ssc.awaitTermination() 
    #ssc.stop()
    #文件系统
    val lines = ssc.textFileStream(dataDirectory)
    #kafka
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    #kafka
    import org.apache.spark.streaming.kafka._
    #topic1,topic2
    val topicsSet = topics.split(",").toSet
    #broker1-host:port,broker2-host:port
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2)
    #flume
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume._
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    #flume
    val stream = FlumeUtils.createPollingStream(ssc, host, port)
    

    相关文章

      网友评论

        本文标题:Spark Streaming概述

        本文链接:https://www.haomeiwen.com/subject/ilvgvftx.html