美文网首页
Spark Streaming 接收数据

Spark Streaming 接收数据

作者: 歌哥居士 | 来源:发表于2019-03-27 07:13 被阅读0次
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.baozi</groupId>
        <artifactId>spark-learning</artifactId>
        <version>1.0-SNAPSHOT</version>
        <inceptionYear>2008</inceptionYear>
    
        <properties>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.1.0</spark.version>
            <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
            <hbase.version>1.2.0-cdh5.7.0</hbase.version>
            <kafka.version>0.9.0.0</kafka.version>
        </properties>
    
        <repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <dependencies>
            <!-- Scala -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <!-- Spark Streaming -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <!-- Hadoop -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <!-- HBase -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.version}</version>
            </dependency>
    
            <!-- kafka -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>${kafka.version}</version>
            </dependency>
    
            <!--
            如果报错:
            java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.ObjectIdResolver
            就添加这个,添加的版本根据maven中依赖的该项目的版本而定
            -->
            <dependency>
                <groupId>com.fasterxml.jackson.module</groupId>
                <artifactId>jackson-module-scala_2.11</artifactId>
                <version>2.6.5</version>
            </dependency>
    
    
        </dependencies>
    
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    处理Socket数据

    $ nc -lk 9999
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object NetworkWordCount {
      def main(args: Array[String]): Unit = {
    
        // SparkConf
        val conf = new SparkConf()
          .setMaster("local[2]") // local至少两个,一个Receiver使用,一个执行处理操作
          .setAppName("NetworkWordCount")
          .set("spark.driver.host", "localhost")
    
        // StreamingContext,Seconds表示每几秒为一批次
        val ssc = new StreamingContext(conf, Seconds(5))
        
        // 关键代码
        // StorageLevel.MEMORY_AND_DISK_SER_2:存在内存和磁盘上,序列化,2份
        val lines = ssc.socketTextStream("localhost", 9999)
        val result = lines.flatMap(_.split(" ")).map((_ , 1)).reduceByKey(_ + _)
        result.print()
    
        // ~
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    处理文件系统数据

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 使用Spark Streaming处理文件系统(local/hdfs)的数据
      */
    object FileWordCount {
      def main(args: Array[String]): Unit = {
        // SparkConf
        val conf = new SparkConf()
          .setMaster("local") // 不需要Receiver
          .setAppName("FileWordCount")
          .set("spark.driver.host", "localhost")
    
        // StreamingContext
        val ssc = new StreamingContext(conf, Seconds(5))
    
        // 关键代码
        val lines = ssc.textFileStream("file:///Users/baozi/temp-doc/ss") // 填写目录就可以了
        val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        result.print()
    
        // ~
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    在随便一个目录(我在/Users/baozi/temp-doc/)下创建一个测试文件,然后cp或者mv文件到指定的目录(/Users/baozi/temp-doc/ss)下。
    1. 放入指定目录的文件,必须是统一数据格式。
    2. 指定目录的每个文件必须是一次性添加进来。
    3. 处理过的文件不会再处理,修改也无效。
    

    updateStateByKey

    这种方式会累加之前批次的处理结果。

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 使用updateStateByKey
      */
    object StatefulWordCount {
      def main(args: Array[String]): Unit = {
    
        // SparkConf
        val conf = new SparkConf()
          .setMaster("local[2]")
          .setAppName("StatefulWordCount")
          .set("spark.driver.host", "localhost")
    
        // StreamingContext
        val ssc = new StreamingContext(conf, Seconds(5))
        // 如果使用了stateful的算子,必须要设置checkpoint
        // 实际生产环境最好放到hdfs上
        ssc.checkpoint(".")
    
        // 关键代码
        val lines = ssc.socketTextStream("localhost", 9999)
        val result = lines.flatMap(_.split(" ")).map((_ , 1))
        // 这种方式会累加每次批处理的结果,例如:
        // 第一批输入a a a b b c,统计出:(a,3) (b,2) (c,1)
        // 第二批再输入a a a b b c,会累加之前的:(a,6) (b,4) (c,2)
        val state = result.updateStateByKey[Int](updateFunction _)
        state.print()
    
        // ~
        ssc.start()
        ssc.awaitTermination()
      }
    
      /**
        * @param curValues 本批次的数据
        * @param preValues 已有的数据
        * @return
        */
      def updateFunction(curValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
        val current = curValues.sum
        val previous = preValues.getOrElse(0)
        Some(current + previous)
      }
    
    }
    

    相关文章

      网友评论

          本文标题:Spark Streaming 接收数据

          本文链接:https://www.haomeiwen.com/subject/uivcvqtx.html