美文网首页
Structure Streaming[Official Doc

Structure Streaming[Official Doc

作者: 奉先 | 来源:发表于2018-02-07 10:23 被阅读480次

    1. Overview:

    Structured Streaming是基于Spark SQL引擎的可扩展、具有容错性的流处理引擎。系统通过checkpointing和写Ahead Logs的方式保证端到端的只执行一次的容错保证。Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing。

    2. Word Count

    下面实现一个简单的基于Structured Streaming的WordCount程序。程序监听服务器的TCP Socket,对监听的内容做word count操作。为了测试,在服务器端开启命令行来发送数据:

    $nc -lk 9999
    

    1.首先,为了使用Structured Streaming的API,需要先引入SparkSQL和Spark Structured Streaming的maven依赖:

        <!-- Apache Spark Structured Streaming -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>2.1.0</version>
        </dependency>
        <!-- Apache Spark Structured SQL -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.1.0</version>
        </dependency>
    

    2.代码实现(scala):
    lines(是一个DataFrame)代表一个包含流文本数据的无边界的表,表只包含一个字段"value",流数据中的每一行就是该表的一条记录。
    scala的实现方法:

    object StructureStreamingWordCount {
      def main(args: Array[String]) {
        //create a local SparkSession
        val spark = SparkSession
          .builder
          .appName("SparkWordCountNetWork")
          .getOrCreate()
        //listen to 10.198.193.189:9999,获得来自该端口的流数据。
        import spark.implicits._
        val lines = spark.readStream
          .format("socket")
          .option("host","10.198.193.189")
          .option("port",9999)
          .load()
        //需要使用as方法将DataFrame转换成DataSet,才能使用flatMap转换操作。
        val words = lines.as[String].flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
    
        val query = wordCounts.writeStream
          //print the complete set of counts
          .outputMode("complete")
          .format("console")
          .start()
        query.awaitTermination()
      }
    }
    
    1. 打包代码成jar包,提交这个jar包:
    ./bin/spark-submit /home/natty/spark/log-analyzer.jar \
    com.pmpa.bigdata.spark.app.core.StructureStreamingWordCount
    

    4.通过nc命令发送句子,测试结果。

    +------+-----+
    | value|count|
    +------+-----+
    |apache|    1|
    |hadoop|    3|
    +------+-----+
    

    3. Programming Model

    Structured Streaming的核心思想是把实时数据当做一个持续写入的表,这样一来,流处理模型和批处理模型就很相似了。
    1.基本概念:
    将输入数据流理解为“Input Table”。数据流中每一条记录是新插入input table的一条记录。


    将数据流理解为无边界表

    针对输入的查询会生成“Result Table”。每个触发间隔(下图中是1s),新的记录追加到input table,并最终更新Result Table。每当Result Table被更新,我们都会将变化的记录行写入外部sink。


    Programming Model
    “Output”是指需要写入外部存储的内容。output可以定义为3种模式:
    2.WordCount程序分析:
    对于上边的Word Count程序,DataFrame “lines”是Input Table,DataFrame “wordCounts”是result table。
    model of wordcount

    Spark Structured Streaming模型与其他的流处理引擎有着很大的区别。很多流系统要求用户自行执行聚合计算,所以需要自行保证容错、数据一致性(至少计算一次、至多计算一次、只计算一次)。但是在Spark Structured Streaming中,在有新的数据更新时,Spark来负责更新Result Table。

    4. spark streaming:

    Spark
    SparkCore
    RDD
    SparkContext
    SparkSQL
    DataFrame
    SQLContext
    SparkStreaming
    DStream
    将流式数据分成很多RDD,按照时间间隔1s/5s batch
    StreamingContext
    streaminContext = new StreamingContext(sc, Seconds(5))
    HADOOP:
    MapReduce
    Hive/HBase
    Storm

    实时流式计算框架
    Storm
    storm.apache.org
    JStorm
    捐献Apache
    互联网电商企业,如果大数据的话,思想:BAT公司都使用的东西,小的公司当然也可以解决数据分析处理。

    用户推荐
    JAVA/C
    Mahout
    0.9.x以后,就不支持MapReduce,迁移到Spark,2014年初宣布

    SparkStreaming
    -1,实时统计,累加
    kafka + sparkstreaming(updateStatByKey)
    -2,实时统计,最近一段时间指标
    实时查看最近一个小时之内的用户点击量,各省或者重点城市
    window

    抛出问题:
    SparkStreaming
    -1,数据来源
    Socket
    Flume
    Kafka ---- 最多
    val kafkaDStream = streamingContext.kafka()
    -2,数据处理
    DStream#flatMap(),map(),filter(),reduceByKey,...
    -3,数据输出(Output)
    内存数据库
    Redis
    HBase
    JDBC

            dstream#foreachRDD(rdd => {
              rdd.toDF.write.jdbc
            })
    

    官网案例:
    SparkStreamin从Socket接收数据,WordCount统计,结果输出控制台

    对于SparkStreaming开发测试来说,尤其是local
    至少是
    --master local[2]
    -#,2:代表的是启动2个Thread
    -#,需要一个Thread进行接收数据
    Recevier
    -#,另外一个Thread用于运行Task
    进行数据的处理

    对于GraphX
    一张图
    顶点 -> RDD[(Long, T)]
    vertexId
    vertexAttributes
    边 -> RDD[(Long, Long, T)]
    SourceVertexId
    DestVertexId
    EdgeAttribute

    ======================================================
    -1,Recevier
    block interval
    默认值200ms
    接收的数据,分成block,进行存储在Executors中

    -2,StreamingContext
    batch interval
    处理数据的时间间隔,每次处理多长时间范围内的数据
    ssc = new StreamingContext(sc, Second(1))

    t-0 t-200 t-400 t-600 t-800 t-1000 t-1200 t-1400
    [blk-01 blk-02 blk-03 blk-04 blk-05 ] blk-06 blk-07
    |
    RDD
    |
    Process

    RDD
    sc.textFile()
    DStream
    ssc.textFileStream

    ==
    DataFrame
    spark.read.json("")
    Struct Streaming
    spark.read.streaming.json("")

    spark-shell
    load script
    scala> :load /opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/WordCount.scala

    作业:
    @Experimental
    def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
    self,
    spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
    }

    使用新的API进行实时累加统计

    陈超
    七牛技术总监
    国内Spark技术布道者
    0.9.0

    =======================================================
    DStream#transform(func)

    ======================================================
    需求:
    词频统计
    每次统计最近10秒的数据

    ======================================================
    SparkStreaming与Flume和Kafka的集成
    Recevier
    接收数据
    针对Flume来说
    Flume Agent
    Source -> Chennal -> Sink
    将数据写给SparkStreaming
    -1,push
    将数据推个Recevier
    block interval
    200ms
    blk-01 blk-02 blk-03 ...
    Executors
    batch interval

        特殊情况的时候
            -1,应用全部停止
            -2,未处理的block所在所有的Executor停止
            
        仅仅接收的了数据,但是不能保证所有的接收数据全部被处理?    
        
    Spark 1.3.x开始,性能考虑,出现
        
    -2,pull
        拉取数据
        最好
        offset
            10     50
        batch job
            RDD:
                sink:10 -50 
            SparkJob的时候,直接到FlumeSink这边拿去数据,然后处理,结构输出,更新offset
            
        -1,数据不会丢失
        -2,数据不会被重复处理
    

    bin/spark-shell
    --master local[3]
    --jars externallibs/mysql-connector-java-5.1.27-bin.jar,
    externallibs/spark-streaming-flume_2.10-1.6.1.jar,
    externallibs/flume-avro-source-1.5.0-cdh5.3.6.jar,
    externallibs/flume-ng-sdk-1.5.0-cdh5.3.6.jar

    bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/spark-push-flume.properties -Dflume.root.logger=DEBUG,console

    SparkStreaming与Kafka集成
    Direct方式
    没有Recevier

    batch interval
        RDD
        
    Kafka
        Topic
            part-01     part-02     part-03     part-04
              index
    SparkJob
        val rdd = invoke-kafka-consumer-api-topic-offset
        rdd.process
    

    Kafka Producer API
    JAVA
    SCALA

    bin/spark-shell
    --master local[3]
    --jars externallibs/mysql-connector-java-5.1.27-bin.jar,
    externallibs/spark-streaming-kafka_2.10-1.6.1.jar,
    externallibs/kafka_2.10-0.8.2.1.jar,
    externallibs/kafka-clients-0.8.2.1.jar,\
    externallibs/zkclient-0.3.jar,
    externallibs/metrics-core-2.2.0.jar

    
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    val ssc = new StreamingContext(sc, Seconds(5))
    
    // Step 1:Recevier Data From Where
    val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)
    
    // Step 2: Process Data Base DStream
    // Split each line into words
    val words = socketDStream.flatMap(_.split(" "))
    
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    // Step 3: Output Result
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
    
    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    
    sc.stop
    
    /**
     =========================================================================
     */
    
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    val ssc = new StreamingContext(sc, Seconds(5))
    
    val socketDStream = ssc.textFileStream("/user/beifeng/sparkstreaming/hdfsfiles")
    
    val words = socketDStream.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    wordCounts.print()
    
    ssc.start()             
    ssc.awaitTermination()  
    
    
    ///
    // foreachFunc: (RDD[T], Time) => Unit
    dstream.foreachRDD(rdd => {
      // 将分析的数据存储到JDBC中,MySQL数据中
      val connection = createJDBCConnection()  // executed at the driver
      rdd.foreach { record =>
        connection.putStateResult(record) // executed at the worker
      }
    })
    
    
    // 建议的模式,
    /**
        比如讲数据结果写入到MySQL数据库的某张表中
        -1,首先有一个工具类,专门连接数据库的一个连接池,池中有一些连接Connection
        -2,RDD操作时应该针对每个分区进行操作
            比如每个分区获取一个数据库的Connection
        -3,针对分区中的每天数据,一个个的插入到数据库的表中 
        -4,最后将数据库连接放回到连接池中,以便其他进程使用
    */
    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    
    
    
    /**
     =========================================================================
     */
    
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    val ssc = new StreamingContext(sc, Seconds(2))
    
    // Step 1:Recevier Data From Where
    val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)
    
    // Step 2: Process Data Base DStream
    // Split each line into words
    val words = socketDStream.flatMap(_.split(" "))
    
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), Seconds(8), Seconds(4))
    
    // Step 3: Output Result
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
    
    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    
    sc.stop
    
    
    
    // ===============================================
    
    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.flume._
    
    val ssc = new StreamingContext(sc, Seconds(5))
    
    // Step 1:Recevier Data From Where
    // Flume: FlumeUtils, Kafka: KafkaUtils
    val flumeDStream = FlumeUtils.createStream(ssc, "hadoop-senior01.ibeifeng.com", 9988).map(event => new String(event.event.getBody.array()))
    
    // Step 2: Process Data Base DStream
    // DStream[Long] 
    val words = flumeDStream.flatMap(_.split(" "))
    
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    // Step 3: Output Result
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
        
    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    
    sc.stop
    

    5. spark sql

    思考:
    Hive 实质什么?
    Hive 如何将SQL转换为MapReduce?

    Oozie:
        MapReduce Action
    

    Dremel
    -1,Presto
    -2,Impala
    游戏公司
    --1,yum rpm 安装
    --2,CM
    Flume + Kafka + HBase + Impala + JAVA + Python
    -3,Drill

    研发性学习:
    http://kylin.apache.org/

    ====================================================
    Spark SQL 前世今生
    -1,1.0版本以前
    Shark = Hive on Spark
    -2,1.0.x版本
    Spark SQL
    alpha 版本
    -3,1.3.x版本
    DataFrame
    release版本
    -4,1.5.x版本
    钨丝计划
    -5,1.6.x版本
    DataSet
    -6,2.0版本
    。。。。

    总结一点:
    Spark SQL开始的话,替换Hive底层,spark SQL与Hive完全兼容,尤其HQL语句。

    Shark
    -1,Spark SQL
    alpha 版本
    -2,Hive on Spark

    Hive
    -1,MapReduce
    -2,Spark
    -3,Tez

    ====================================================

    Spark SQL 处理Hive表中的数据
    -1,MetaStore
    MySQL数据库
    hive-site.xml
    -2,数据库驱动
    mysql-*.jar

    bin/spark-shell --master local[2] --jars /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar

    bin/spark-sql --master local[2] --jars /opt/cdh5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar

    ====================================================

    -1,对于Spark Core来说,应用程序的入口
    SparkContext
    -2,对于Spark SQL来说,应用程序的入口
    SQLContext -> HiveContext
    SparkContext

    --1, SparkCore: RDD
    scala> case class People(name: String, age: Int)
    defined class People

    scala> val rdd = sc.textFile("/user/beifeng/people.txt").map(line => line.split(",")).map(x => People(x(0), x(1).trim.toInt))

    rdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at <console>:29

    --2,SparkSQL: DataFrame
    scala> val df = sqlContext.read.json("/user/beifeng/people.json")

    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

    scala> val hive_emp_df = sqlContext.read.table("default.emp")
    hive_emp_df: org.apache.spark.sql.DataFrame = [empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: double, comm: double, deptno: int]

    scala> hive_emp_df.schema
    res2: org.apache.spark.sql.types.StructType = StructType(StructField(empno,IntegerType,true), StructField(ename,StringType,true), StructField(job,StringType,true), StructField(mgr,IntegerType,true), StructField(hiredate,StringType,true), StructField(sal,DoubleType,true), StructField(comm,DoubleType,true), StructField(deptno,IntegerType,true))

    ==========================================
    如何创建DataFrame
    -1,外部数据源-内置
    --1,json
    --2,hive
    --3,jdbc
    --4,parquet/orc
    --5,text -没有用处
    -2,从RDD转换
    --1,方式一:
    RDD[CACECLASS]
    --2,方式二:
    自定义schema
    -3,外部数据源 - 需要自己依据接口开发
    --1,ES
    检索
    --2,HBase
    华为,开源
    --3,solr
    ...

    scala> val hive_dept_df = sqlContext.read.table("default.dept")

    def jdbc(url: String, table: String, connectionProperties: Properties)
    需求:
    将DataFrame数据写到MYSQL数据库表中

    val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test?user=root&password=123456"
    import java.util.Properties
    val props = new Properties()

    hive_dept_df.write.jdbc(url, "tb_dept", props)

    问题:
    scala> hive_dept_df.write.jdbc(url, "tb_dept", props)
    java.sql.SQLException: No suitable driver
    at java.sql.DriverManager.getDriver(DriverManager.java:278)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:49)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
    解决方案:
    export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/externallibs/mysql-connector-java-5.1.27-bin.jar
    bin/spark-shell --master local[2]

    案例:
    对不同数据源的表进行JOIN,一个是在Hive中,一个是在MySQL
    -1,emp
    hive
    -2,dept
    mysql
    -3,join

    val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test?user=root&password=123456"
    import java.util.Properties
    val props = new Properties()

    val mysql_dept_df = sqlContext.read.jdbc(url, "tb_dept", props)

    val hive_emp_df = sqlContext.read.table("default.emp")

    // join
    val join_df = hive_emp_df.join(mysql_dept_df, "deptno")

    join_df.registerTempTable("join_emp_dept")

    sqlContext.sql("select empno, ename, deptno, deptname, sal from join_emp_dept order by empno").show

    ===========================================================
    如何从RDD创建DataFrame
    方式一:
    The first method uses reflection to infer the schema of an RDD that contains specific types of objects.
    简单说法:
    RDD[CASE CLASS]
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._

    方式二:
    The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD.
    a DataFrame can be created programmatically with three steps:
    --1,RDD -> RDD[Row]
    import org.apache.spark.sql._
    演示:
    val rdd = sc.textFile("/user/beifeng/people.txt")
    import org.apache.spark.sql._
    val rowRdd = rdd.map(line => line.split(", ")).map(x => Row(x(0), x(1).toInt))
    --2,Create the schema
    import org.apache.spark.sql.types._
    val schema = StructType(StructField("name",StringType,true) :: StructField("age",IntegerType,true) :: Nil)
    --3,Apply the schema to the RDD of Rows
    val people_df = sqlContext.createDataFrame(rowRdd, schema)

    =========================================================
    Spark 1.6
    Dataset
    数据集

    相关文章:
    -1,https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
    -2,https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

    val url = "jdbc:mysql://hadoop-senior01.ibeifeng.com:3306/test"
    import java.util.Properties
    val props = new Properties()
    props.put("user","root")
    props.put("password","123456")

    val mysql_dept_df = sqlContext.read.jdbc(url, "tb_dept", props)

    Spark SQL & ES
    http://blog.csdn.net/stark_summer/article/details/49743687

    回顾SparkSQL
    -1,处理的数据90%来源Hive表
    Hive数据仓库
    -2,过程
    HADOOP -> HIVE -> SparkSQL
    建议:
    编写项目时,最后写上使用SparkSQL在调研,在运行,尝试

    -3,程序入口
        SQLContext & HiveContext
        DataFrame
            DataFrame -> Dataset -> RDD
        -a,RDD[ROW] = DataFrame
            创建两种方式
            -#,CASE CLASS
            -#,RDD[ROW],schema(StructType,StructField)
        -b,外部数据源
            sqlContext.read.
                .json
                .parquet
                .jdbc
                .table
            方便了异步数据源的数据连接处理JOIN
        -c,与Hive的集成
            sqlContext.sql("select * from emp").show
        -d,ThriftServer
            多,启动一个Spark Application,多个客户连接
            beeline
            jdbc
        -e,存储分析结果
            dataframe.write
                .jdbc
                .table
                .text
                .json
    

    ========================================================
    Spark如何与HBase进行集成??
    Spark 如何与HBase进行交互?
    1:读数据
    2: 写数据

    -1,SparkCore
        read:
            hbase-table -> RDD
        write:
            RDD -> Cell
        
        作业:
            如何使用saveAsNewAPIHadoopDataset将Spark RDD数据保存到HBase表中。
        
    -2,SparkSQL
        SparkSQL -> Hive  -> HBase
        此方式
            目前来说,仅限于读取,不建议写入,可能有问题。
        研发型作业:
            测试集成方式
            注意:
                -#,Hive与HBase集成时,需要将HBASE CLIENT相关JAR包放入到$HIVE_HOME/lib
                -#,SPARKSQL名义看是读取Hive表的数据,其实读取的还是HBase表的数据,需要将HBASE CLIENT相关JAR包放入CLASSPATH
        
    -3,SparkSQL External DataSource API
        https://github.com/Huawei-Spark/Spark-SQL-on-HBase
    

    MapReduce 框架中
    input -> map -> shuffle -> reduce -> output

    1:intput
        读取要处理的数据
        // set input path
        FileInputFormat.setPath(new Path("/user/beifeng/mr-wc-in"))
        
        (Key, Value)
            Key:
                offset 偏移量
            value:
                每一行的数据
                
        RecordReader
            每条记录读取
    
        TextInputFormat
            LineRecordReader
    

    MapReduce API
    从HADOOP 0.20.0版本开始,有了新的API
    -#,old
    org.apache.hadoop.mapred.*
    Mapper\Reducer -> Interface
    -#,new
    org.apache.hadoop.mapreduce.*
    Mapper\Reducer -> class

    val rdd = sc.textFile("Readme.md")

    Spark去读取HDFS(HFile)上的文件数据时,与MapReduce读取数据时一样一样的

    HBase与MapReduce集成
    -1,Mapper
    TableMapper
    Mapper从HBase表中读取数据
    (ImmutableBytesWritable, Result)
    mapreduce如果从HBase表读取数据,也是一条一条的读取,
    TableInputFormat
    TableRecordReader

    ===================================================
    SparkSQL
    -1,SparkSQL中的聚合函数

    在DataFrame类中,有如下五组函数

    • @groupname basic
      Basic DataFrame functions
    • @groupname dfops
      Language Integrated Queries
    • @groupname rdd
      RDD Operations
    • @groupname output
      Output Operations
    • @groupname action
      Actions

    SQL and DSL(Domain )
    emp_df.agg("sal" -> "avg", "comm" -> "max").show
    emp_df.agg(Map("sal" -> "avg", "comm" -> "max")).show
    emp_df.agg(max($"comm"), avg($"sal")).show

    emp_df.groupBy($"deptno").agg(max($"comm"), avg($"sal")).show
    
    
    -2,自定义函数UDF
        回顾一下:
            Hive中自定义函数
                -1,extends UDF
                -2,override evaluate
        对于Spark来说
            SCALA,函数式编程
            匿名函数
    

    if函数
    if(condition, true-value, false-value)

    sqlContext.udf.register(
    "trans_comm", // 函数名称
    (comm: Double) => {
    if(comm.toString == ){
    0.0
    }else{
    comm
    }
    }// 函数
    )

    ===========================================================
    如何自定义UDAF?
    avg:
    求平均值
    多 对 一
    输入多个值,输出一个值

    平均值:
        -0,total = 0.0  // 3000 + 2500 + 4200 + 2100 + 5000
        -1,count = 0 // 5
        
        缓冲数据
    

    Spark/MapReduce
    block block block
    | | |
    part-01 part-02 part-03
    | | |
    Task Task Task
    avg avg avg

    sqlContext.udf.register(
    "avg_sal",
    AvgSalUDAF
    )

    sqlContext.sql("select deptno, avg(sal) avg_sal, avg_sal(sal) as_self from emp group by deptno").show

    ==================================================
    Hive中分析函数
    http://lxw1234.com/archives/tag/hive-window-functions
    SparkSQL同样也支持
    Spark 1.4.0
    SPARK-1442: Window functions in Spark SQL and DataFrames

    ROW_NUMBER
    
    针对emp表来说
        获取各部门中工资Top3的人员信息
    -1,按照部门进行分组
        group by deptno
    -2,对各部门人员工资降序排序
        order by sal desc
    -3,获取前几个
        TopKey
    

    SELECT
    empno, ename, sal, deptno,
    ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
    FROM
    emp ;

    empno ename sal deptno rnk
    7839 KING 5000.0 10 1
    7782 CLARK 2450.0 10 2
    7934 MILLER 1300.0 10 3

    7788 SCOTT 3000.0 20 1
    7902 FORD 3000.0 20 2
    7566 JONES 2975.0 20 3
    7876 ADAMS 1100.0 20 4
    7369 SMITH 800.0 20 5

    7698 BLAKE 2850.0 30 1
    7499 ALLEN 1600.0 30 2
    7844 TURNER 1500.0 30 3
    7521 WARD 1250.0 30 4
    7654 MARTIN 1250.0 30 5
    7900 JAMES 950.0 30 6

    SELECT
    empno, ename, sal, deptno
    FROM(
    SELECT
    empno, ename, sal, deptno,
    ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) rnk
    FROM
    emp
    ) t
    WHERE
    t.rnk <= 3 ;

    empno ename sal deptno
    7839 KING 5000.0 10
    7782 CLARK 2450.0 10
    7934 MILLER 1300.0 10

    7788 SCOTT 3000.0 20
    7902 FORD 3000.0 20
    7566 JONES 2975.0 20

    7698 BLAKE 2850.0 30
    7499 ALLEN 1600.0 30
    7844 TURNER 1500.0 30

    数据统计分析:
    -1,过滤,清洗
    -2,分组
    -3,统计
    -4,排序
    -5,TopKey

    6 spark core

    关键知识
    -1,MapReduce
    思想
    shuffle
    -2,Hive
    ETL
    Spark SQL
    前世今生
    数据仓库
    整理数据
    DataFrame

    ===================================================================
    Spark 课程安排:
    -1,SCALA
    一天课程
    -2,Spark Core
    两天课程
    -3,SparkSQL
    一天课程
    -4,Spark Streaming
    一天课程
    -5,项目
    三天课程
    -6,Spark MLlib(可选)
    一天课程

    ===========================================================

    腾讯:
    08年的时候,几百台服务器,日志数据信息。

    分析日志数据:
    -1,服务器是否出现故障,运维

    -2,为产品的设计
        产品经理
    

    阶段一:
    Python + MySQL
    实现:
    -1,每天服务器上面,都会安装MySQL数据库
    日志数据存储到MySQL数据库表中
    --1,Python
    脚本
    --2,某个条件,对日志数据进行分类
    不同的数据存储到不同的MySQL数据库表
    -2,基于SQL进行分析
    分散 -> 存储 -> 分析 -> 聚合 -> 分散 -> 存储 -> 分析

    阶段二:
    HADOOP + Hive
    实现:
    -1,将日志数据放到HDFS中,使用MapReduce进行数据分类、清洗、过滤
    数据加载到Hive表中
    -2,HiveQL
    批处理,离线分析,速度相对来说,较慢,尤其针对数据量非常大的时候。

    业务需求的增加:
        机器学习,数据分析,数据挖掘
            ---1,迭代计算
                循环
    
            MapReduce:
                Map Task -> local filesystem -> Reduce Task -> hdfs 
    
        基于MapReduce的机器学习框架
            Mahout
            --0,在2014年上半年官方就说了,底层不在支持MapReduce,支持Spark
    

    阶段三:
    HADOOP + SPARK
    实现:
    -1,数据还是存储在HDFS/Hive

        -2,数据分析
            Spark
            其他框架作为辅助
            Hive + Python + 。。。。
    
    为什么Spark???????
        数据结构:
            RDD
                -1,集合(List[Type])
                -2,数据存储在内存中
                -3,分区存储
                    hdfs 分块block
                -4,对每个分区中的数据进行运算
                    mapreduce input -> block : map task
    
                input  -> process -> output
    

    MapReduce:
    input -> (map -> shuffle -> reduce) -> output
    Spark:
    input -> (rdd -> rdd -> rdd -> rdd -> ...) -> output

    val list = List(1,2,3,4)
    list.map(x => x * 2)

    AMPLab
    A:算法 M: 机器 P:人类

    Spark 学习建议:
    -1,把握核心
    RDD
    DataFrame
    DStream
    -2,思考,源码,对比
    案例

    =============================================================

    Apache Spark™ is a fast and general engine for large-scale data processing.
    关键词:
    -1,海量数据处理计算框架
    核心:数据结构RDD
    -2,fast
    速度快
    比较性:
    MapReduce

            比较的数据:
                相同数据,相同逻辑,不同的编码
                --1,内存
                    100+
                --2,磁盘
                    10+
    -3,general
        通用
    

    HADOOP生态系统:
    MapReduce Hive Mahout Storm Graphi

    Spark 生态栈:
    Core SparkSQL MLlib SparkStreaming Graphx
    JAR包:

    -1,Spark Application运行everywhere
    local、yarn、memsos、standalon、ec2
    -2,处理数据
    来源一切(Spark SQL:json\parquet\orc\xml\jdbc\solr\es\tsv...)
    hdfs\hive\hbase...

    ==============================================================

    Spark Timeline
    -1,Spark 1.0以前
    0.9

    -2,1.0
    版本发布

    -3,Spark 2015年的发展
    1.2\1.3\1.4\1.5\1.6
    五大版本
    1.3
    Spark SQL:DataFrame
    钨丝计划
    性能优化
    -4,Spark 2.0
    更加简单、更加快速、更加智能

    学习Spark三大网站:
    -1,官方文档
    http://spark.apache.org/
    -2,源码
    https://github.com/apache/spark
    -3,官方博客
    https://databricks.com/blog

    Hive 执行引擎:
    -1,MapReduce
    -2,Spark
    https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
    -3,Tez
    Hortonworks

    针对APACH HADOOP
    ./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0 -Phive -Phive-thriftserver -Pyarn

    针对CDH HADOOP
    ./make-distribution.sh --tgz -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver -Pyarn

    ========================================================
    Local
    YARN/Standalone

    --Spark Local
    -1,JAVA
    -2,HDFS
    -3,SCALA
    -4,Spark

    大记住:
    bin/spark-submit
    类似于
    bin/yarn
    提交Spark Application的脚本就是spark-submit

    spark-shell
    spark-submit

    =====================================================
    所有的Spark程序来说,程序的入口是SparkContext

    分析数据:
    step 1: input
    rdd = sc.textFile("/user/beifeng/xx")
    step 2: process
    rdd.map().filter().reduceByKey()
    step 3: output
    rdd.saveAsTextFile()

    16/06/26 20:01:56 INFO BlockManagerMaster: Registered BlockManager
    16/06/26 20:01:56 INFO SparkILoop: Created spark context..
    Spark context available as sc.

    rdd = sc.textFile("/user/beifeng/xx")
    类比:
    默认情况下,MapTask如何读取数据,一行一行的读取数据<key,value>
    在Spark中
    也是一行一行的读取数据,每行数据是String

    WortCount
    -1,MapReduce
    -2,Spark
    -3,Flink
    http://flink.apache.org/

    input

    val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

    process

    val wordCountRdd = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a, b) => (a + b))

    output

    wordCountRdd.saveAsTextFile("/user/beifeng/spark/wordcount/output")

    =====================
    -1,不是说MapReduce框架,思想不好,而不是不适合做某些事情
    (key,value)
    -2,Spark 数据处理中
    我们更多是与MapReduce类似,将数据转换为(key,value)进行处理

    val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

    rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)

    ==================================================================

    总结:
    -1,认识Spark
    RDD
    --1,有哪些?
    --2,用哪些?
    Spark Applicaiton???
    --1,YARN
    目前
    --2,Standalone/Mesos
    自身带分布式资源管理管理和任务调度

    hadoop 2.x release 2.2.0 2013/10/15
    hadoop 2.0.x - al
    hadoop 2.1.x - beta

    Cloudera
    cdh3.x - 0.20.2
    cdh4.x - 2.0.0
    HDFS -> HA:QJM ; Federation
    Cloudera Manager 4.x
    cdh5.x

    ================================================

    Spark Standalon Mode
    认为
    Spark 本身自带的一个分布式资源管理系统以及任务调度的框架

    类似于YARN这样的框架
    分布式
    主节点:
    Master - ResourceManager
    从节点:
    Works - NodeManagers

    start-slaves.sh
    启动所有的从节点,也就是Work
    注意:
    使用此命令时,运行此命令的机器,必须要配置与其他机器的SSH无密钥登录,否则启动的时候会出现一些问题,比如说输入密码之类的。

    对于Spark Application
    两部分组成:
    -1,Driver Program -> 4040 4041 4042
    main
    SparkContext ---最最重要
    -2,Executor
    JVM(进程)
    运行我们Job的Task

    REPL:
    shell交互式命令

    Spark Application
    Job-01
    count
    Job-02
    Stage-01
    Task-01(线程) -> Map Task(进程)
    Task-02(线程) -> Map Task(进程)
    每个Stage中的所有Task,业务都是相同的,处理的数据不同
    Stage-02
    Job-03

    从上述运行程序案例来看:
    如果RDD调用的函数,返回值不是RDD的时候,就会触发一个JOB,进行执行。

    思考:
    reduceByKey 到底做了什么事情呢?
    -1,分组
    将相同key的value放在一起
    -2,对value进行reduce
    进行合并

    经分析,对比MapReduce中WordCount程序运行,推断出Spark JOB中Stage的划分依据RDD之间是否产生Shuffle进行划分的。
    

    =============================================================

    val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

    val wordCountRdd = rdd.flatMap(.split(" ")).map((,1)).reduceByKey(_ + _)

    需求:
    安装词频进行倒排,获取前3个单词(TOP KEY)

    wordCountRdd

    ================================
    在企业中,如何开发Spark Application
    spark-shell + IDEA
    -1,在IDEA中编写代码

    -2,在spark-shell中执行代码
    
    -3,使用IDEA将代码打包成JAR包,使用bin/spark-submit进行提交应用
    

    ================================
    Spark HistoryServer
    监控运行完成的Spark Applicaiton。

    分为两个部分:
    第一、设置SparkApplicaiton在运行时,需要记录日志信息

    第二、启动HistoryServer,通过界面查看

    =================================================
    需求一:
    The average, min, and max content size of responses returned from the server.
    ContentSize
    需求二:
    A count of response code's returned.
    responseCode
    需求三:
    All IPAddresses that have accessed this server more than N times.
    ipAddresses
    需求四:
    The top endpoints requested by count.
    endPoint

    mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scala-tools.org/repo-releases -DgroupId=com.ibeifeng.bigdata.spark.app -DartifactId=log-analyzer -Dversion=1.0

    Hive:
    将文件中的数据映射到一张表中,

    正则表达式:

    Option
    Some
    有值
    None
    无值
    16/07/16 18:46:51 INFO SparkContext: Successfully stopped SparkContext
    Exception in thread "main" java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
    at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
    at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:136)
    at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:129)
    at org.spark-project.jetty.servlet.ServletContextHandler.<init>(ServletContextHandler.java:98)
    at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:126)
    at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:113)
    at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:78)
    at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
    at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:62)
    at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:63)
    at org.apache.spark.ui.SparkUI.<init>(SparkUI.scala:76)
    at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:195)
    at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:146)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:473)
    at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$.main(LogAnalyzer.scala:16)
    上述原因:
    由于JAR冲突导致。

    回顾:
    -1,了解认识Spark
    MapReduce比较
    “四大优势”
    --1,速度快
    --2,使用简单
    --3,一栈式
    --4,无处不在的运行
    开发测试
    SCALA: REPL/Python
    -2,Spark Core
    两大抽象概念
    --1,RDD
    集合,存储不同类型的数据 - List
    ---1,内存
    memory
    ---2,分区
    hdfs: block
    ---3,对每个分区上数据进行操作
    function
    --2,共享变量shared variables
    ---1,广播变量

            ---2,累加器
                计数器
    -3,环境与开发
        --1,Local Mode
            spark-shell
        --2,Spark Standalone
            配置
            启动
            监控
            使用
        --3,HistoryServer
            -1,针对每个应用是否记录eventlog
            -2,HistoryServer进行展示
        --4,如何使用IDE开发Spark Application
            -1,SCALA PROJECt
                如何添加Spark JAR包
            -2,MAVEN PROJECT
    

    =================================================
    Spark 开发
    step 1:
    input data -> rdd/dataframe
    step 2:
    process data -> rdd##xx() / df#xx | "select xx, * from xx ..."
    step 3:
    output data -> rdd.saveXxxx / df.write.jdbc/json/xxx

    ================================================
    问题:
    16/07/23 09:38:13 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
    16/07/23 09:38:13 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
    java.lang.RuntimeException: Cannot parse log line: mail.geovariances.fr - - [09/Mar/2004:05:02:11 -0800] "GET /twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif HTTP/1.1" 304 -
    at com.ibeifeng.bigdata.spark.app.core.ApacheAccessLog$.parseLogLine(ApacheAccessLog.scala:38)
    at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$$anonfun$5.apply(LogAnalyzer.scala:25)
    at com.ibeifeng.bigdata.spark.app.core.LogAnalyzer$$anonfun$5.apply(LogAnalyzer.scala:25)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    闭包closures:
    JS/Jquery

    • Internally, each RDD is characterized by five main properties:

      • A list of partitions
        protected def getPartitions: Array[Partition]
      • A function for computing each split
        @DeveloperApi
        def compute(split: Partition, context: TaskContext): Iterator[T]
      • A list of dependencies on other RDDs
        protected def getDependencies: Seq[Dependency[_]] = deps
      • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
        /** Optionally overridden by subclasses to specify how they are partitioned. */
        @transient val partitioner: Option[Partitioner] = None
      • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
    • an HDFS file)
      protected def getPreferredLocations(split: Partition): Seq[String] = Nil

    RDD

    回顾WordCount

    val rdd = sc.textFile("xxx")

    val wordRdd = rdd.flatMap(.split(" "))
    val kvRdd = wordRdd.map((
    , 1))

    kvRdd.groupByKey().map(tuple => {
    (tuple._1, tuple.2.list.reduce( + _))
    })

    val wordcountRdd = kvRdd.reduceByKey(_ + _)

    wordcountRdd.saveAsTextFile("yy")

    kvRdd <- wordRdd <- rdd

    wordRdd <- rdd

    创建RDD的两种方式:
    方式一:
    并行化集合
    List\Seq\Array
    SparkContext:
    def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T]
    方式二:
    外部存储系统

    方式:
    RDD Transformation

    ==================================
    对于分布式计算框架来说,性能瓶颈
    IO
    -1,磁盘IO
    -2,网络IO

    rdd1 -> rdd2
        Shuffle
    

    ============================================
    groupByKey() & reduceByKey()

    在实际开发中,如果可以使用reduceByKey实现的功能,就不要使用groupBykey
    使用reduceByKey有聚合功能,类似MapReduce中启用了Combiner

    ===============
    join()
    -1,等值链接

    -2,左连接
    

    数据去重
    结果数据
    res-pre.txt - rdd1
    新数据进行处理
    web.tsv - 10GB - rdd2
    解析里面的url,
    如果res-pre.txt中包含,就不放入,不包含就加入或者不包含url进行特殊处理

    rdd2.leftJoin(rdd1)

    ===================================================

    Group Top Key

    aa 78
    bb 98
    aa 80
    cc 98
    aa 69
    cc 87
    bb 97
    cc 86
    aa 97
    bb 78
    bb 34
    cc 85
    bb 92
    cc 72
    bb 32
    bb 23

    rdd.map(line => line.split(" ")).map(arr => (arr(0), arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse))

    =======================================================
    SparkContext
    -1,向Master(主节点,集群管理的主节点)申请资源,运行所有Executor
    -2,创建RDD的入口
    sc.textFile("") // 从外部存储系统创建
    sc.parxx() // 并行化,从Driver 中的集合创建
    -3,调度管理JOB运行
    DAGScheduler 、 TaskScheduler
    --3.1
    为每个Job构建DAG图
    --3.2
    DAG图划分为Stage
    按照RDD之间是否存在Shuffle
    倒推(Stack)
    --3.3
    每个Stage中TaskSet
    每个阶段中Task代码相同,仅仅处理数据不同

    前面讲解WordCount程序
    词频统计

    . ? # $ 毫无意义,无需统计

    val list = List(".", "?", "#","$")

    val rdd = sc.textFile("xxx")

    val wordRdd = rdd.flatMap(_.split(" "))

    val filterRdd = wordRdd.filter(word => !list.contains(word))

    val kvRdd = filterRdd.map((, 1))
    val wordcountRdd = kvRdd.reduceByKey(
    + _)

    wordcountRdd.saveAsTextFile("yy")

    使用广播变量:
    val list = List(".", "?", "!", "#", "$")
    val braodCastList = sc.broadcast(list)
    val wordRdd = sc.textFile("")
    wordRdd.filter(word => {
    braodCastList.value.contains(word)
    })

    对于Spark Applicaiton来说,很多初学者,头痛的一个问题就是
    外部依赖JAR包
    如下几种方式:
    方式一:
    --jars JARS
    Comma-separated list of local jars to include on the driver and executor classpaths.
    jar包的位置一定要写决定路径。

    方式二:
    --driver-class-path
    Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.

    方式三:
    SPARK_CLASSPATH
    配置此环境变量

    企业中Spark Application提交,shell 脚本

    spark-app-submit.sh:

    !/bin/sh

    SPARK_HOME

    SPARK_HOME=/opt/cdh5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6

    SPARK CLASSPATH

    SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar

    ${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar

    ====================================================
    YARN
    -1,分布式资源管理
    主节点:ResouceManager
    从节点:NodeManager -> 负责管理每台机器上的资源(内存和CPU Core)
    -2,资源调度
    --1,容器Container
    AM/Task
    --2,对于运行在YARN上的每个应用,一个应用的管理者ApplicaitonMaster 资源申请和任务调度

    Spark Application
    -1,Driver Program
    资源申请和任务调度
    -2,Executors
    每一个Executor其实就是一个JVM,就是一个进程

    以spark deploy mode : client
    AM
    -- 全部都允许在Container中
    Executor s
    运行在Container中,类似于MapReduce任务中Map Task和Reduce Task一样

    Driver -> AM -> RM

    ============================================
    如何判断RDD之间是窄依赖还是宽依赖:
    父RDD的每个分区数据 给 子RDD的每个分区数据

        1    ->     1
    
        1    ->     N    :  MapReduce 中 Shuffle
    

    val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")

    val wordRdd = rdd.flatMap(.split(" "))
    val kvRdd = wordRdd.map((
    , 1))
    val wordcountRdd = kvRdd.reduceByKey(_ + _)

    wordcountRdd.collect

    input -> rdd  -> wordRdd -> kvRdd : Stage-01 -> ShuffleMapStage -> SMT
    

    ->

    wordcountRdd -> output            :Stage-02 -> ResultStage -> ResultTask
    
    • Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
    • other stage(s), or a result stage, in which case its tasks directly compute a Spark action
    • (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
    • track the nodes that each output partition is on.

    相关文章

      网友评论

          本文标题:Structure Streaming[Official Doc

          本文链接:https://www.haomeiwen.com/subject/girozxtx.html