美文网首页
spark Streaming代码

spark Streaming代码

作者: 烈格黑街 | 来源:发表于2019-08-15 14:45 被阅读0次

    \pom.xml[1]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.atguigu</groupId>
        <artifactId>spark</artifactId>
        <packaging>pom</packaging>
        <version>1.0-SNAPSHOT</version>
        <modules>
            <module>sparkCore</module>
            <module>sparkStreaming</module>
            <module>sparkSql</module>
            <module>sparkGraphx</module>
            <module>sparkMLlib</module>
        </modules>
    
        <properties>
            <mysql.version>6.0.5</mysql.version>
            <spring.version>4.3.6.RELEASE</spring.version>
            <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
            <log4j.version>1.2.17</log4j.version>
            <quartz.version>2.2.3</quartz.version>
            <slf4j.version>1.7.22</slf4j.version>
            <hibernate.version>5.2.6.Final</hibernate.version>
            <camel.version>2.18.2</camel.version>
            <config.version>1.10</config.version>
            <jackson.version>2.8.6</jackson.version>
            <servlet.version>3.0.1</servlet.version>
            <net.sf.json.version>2.4</net.sf.json.version>
            <activemq.version>5.14.3</activemq.version>
            <spark.version>2.1.1</spark.version>
            <scala.version>2.11.8</scala.version>
            <hadoop.version>2.7.3</hadoop.version>
        </properties>
    
        <dependencies>
            <!-- Logging -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>jcl-over-slf4j</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
            <!-- Logging End -->
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
            </plugins>
            <pluginManagement>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <version>3.0.0</version>
                        <executions>
                            <execution>
                                <id>make-assembly</id>
                                <phase>package</phase>
                                <goals>
                                    <goal>single</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </pluginManagement>
        </build>
    
    </project>
    

    \sparkStreaming\pom.xml[2]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>spark</artifactId>
            <groupId>com.atguigu</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
        <packaging>pom</packaging>
    
        <artifactId>sparkStreaming</artifactId>
    
        <modules>
            <module>sparkstreaming_helloworld</module>
            <module>sparkstreaming_customerReceiver</module>
            <module>sparkstreaming_queueRdd</module>
            <module>sparkstreaming_kafka</module>
            <module>sparkstreaming_statefulWordCount</module>
            <module>sparkstreaming_windowWordCount</module>
        </modules>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
        </dependencies>
    </project>
    

    \sparkStreaming\sparkstreaming_customerReceiver\pom.xml[3]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>sparkStreaming</artifactId>
            <groupId>com.atguigu</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>sparkstreaming_customerReceiver</artifactId>
    
        <dependencies>
    
        </dependencies>
    
        <build>
            <finalName>customwordcount</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.atguigu.streaming.CustomReceiver</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    \sparkStreaming\sparkstreaming_customerReceiver\src\main\resources\log4j.properties[4]

    #
    # Copyright (c) 2017. WuYufei All rights reserved.
    #
    
    log4j.rootLogger=warn,stdout,R
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n
    
    log4j.appender.R=org.apache.log4j.RollingFileAppender
    log4j.appender.R.File=spark.log
    log4j.appender.R.MaxFileSize=1024KB
    log4j.appender.R.MaxBackupIndex=1
    
    log4j.appender.R.layout=org.apache.log4j.PatternLayout
    log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n
    
    

    \sparkStreaming\sparkstreaming_customerReceiver\src\main\scala\com\atguigu\streaming\CustomReceiver.scala[5]

    package com.atguigu.streaming
    
    import java.io.{BufferedReader, InputStreamReader}
    import java.net.Socket
    import java.nio.charset.StandardCharsets
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.receiver.Receiver
    
    /**
      * Created by wuyufei on 06/09/2017.
      */
    // String就是接收數據的類型
    class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
      override def onStart(): Unit = {
        // Start the thread that receives data over a connection
        new Thread("Socket Receiver") {
          override def run() { receive() }
        }.start()
      }
    
      override def onStop(): Unit = {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false
      }
    
      /** Create a socket connection and receive data until receiver is stopped */
      private def receive() {
        var socket: Socket = null
        var userInput: String = null
        try {
          // Connect to host:port
          socket = new Socket(host, port)
    
          // Until stopped or connection broken continue reading
          val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
    
          userInput = reader.readLine()
          while(!isStopped && userInput != null) {
    
            // 內部的函數,將數據存儲下倆
            store(userInput)
    
            userInput = reader.readLine()
          }
          reader.close()
          socket.close()
    
          // Restart in an attempt to connect again when server is active again
          restart("Trying to connect again")
        } catch {
          case e: java.net.ConnectException =>
            // restart if could not connect to server
            restart("Error connecting to " + host + ":" + port, e)
          case t: Throwable =>
            // restart if there is any other error
            restart("Error receiving data", t)
        }
      }
    }
    
    object CustomReceiver {
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(1))
    
        // Create a DStream that will connect to hostname:port, like localhost:9999
    
        val lines = ssc.receiverStream(new CustomReceiver("master01", 9999))
    
        // Split each line into words
        val words = lines.flatMap(_.split(" "))
    
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // Count each word in each batch
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
    
        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print()
    
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate
        //ssc.stop()
      }
    }
    

    \sparkStreaming\sparkstreaming_helloworld\pom.xml[6]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>sparkStreaming</artifactId>
            <groupId>com.atguigu</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>sparkstreaming_helloworld</artifactId>
    
        <dependencies>
    
        </dependencies>
    
        <build>
            <finalName>networdcount</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.atguigu.streaming.WorldCount</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    \sparkStreaming\sparkstreaming_helloworld\src\main\scala\com\atguigu\streaming\WorldCount.scala[7]

    package com.atguigu.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    /**
      * Created by wuyufei on 06/09/2017.
      */
    object WorldCount {
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(1))
    
        // Create a DStream that will connect to hostname:port, like localhost:9999
        val lines = ssc.socketTextStream("zk1", 9999)
    
        // Split each line into words
        val words = lines.flatMap(_.split(" "))
    
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // Count each word in each batch
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
    
        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print()
    
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate
        
        //ssc.stop()
      }
    }
    
    

    \sparkStreaming\sparkstreaming_kafka\pom.xml[8]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>sparkStreaming</artifactId>
            <groupId>com.atguigu</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>sparkstreaming_kafka</artifactId>
    
        <dependencies>
    
            <!-- 提供对象连接池的一种方式 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.4.2</version>
            </dependency>
    
            <!-- 用来连接Kafka的工具类 -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.2.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <finalName>kafkastreaming</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.atguigu.streaming.KafkaStreaming</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    \sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\KafkaStreaming.scala[9]

    package com.atguigu.streaming
    
    import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.api.java.function.VoidFunction
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by wuyufei on 06/09/2017.
      */
    
    //单例对象
    object createKafkaProducerPool{
    
      //用于返回真正的对象池GenericObjectPool
      def apply(brokerList: String, topic: String):  GenericObjectPool[KafkaProducerProxy] = {
        val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))
        val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)
        //指定了你的kafka对象池的大小
        val poolConfig = {
          val c = new GenericObjectPoolConfig
          val maxNumProducers = 10
          c.setMaxTotal(maxNumProducers)
          c.setMaxIdle(maxNumProducers)
          c
        }
        //返回一个对象池
        new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)
      }
    }
    
    object KafkaStreaming{
    
      def main(args: Array[String]) {
    
        //设置sparkconf
        val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
        //新建了streamingContext
        val ssc = new StreamingContext(conf, Seconds(1))
    
        //kafka的地址
        val brobrokers = "192.168.56.150:9092,192.168.56.151:9092,192.168.56.152:9092"
        //kafka的队列名称
        val sourcetopic="source1";
        //kafka的队列名称
        val targettopic="target1";
    
        //创建消费者组名
        var group="con-consumer-group"
    
        //kafka消费者配置
        val kafkaParam = Map(
          "bootstrap.servers" -> brobrokers,//用于初始化链接到集群的地址
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          //用于标识这个消费者属于哪个消费团体
          "group.id" -> group,
          //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
          //可以使用这个配置,latest自动重置偏移量为最新的偏移量
          "auto.offset.reset" -> "latest",
          //如果是true,则这个消费者的偏移量会在后台自动提交
          "enable.auto.commit" -> (false: java.lang.Boolean),
          //ConsumerConfig.GROUP_ID_CONFIG
        );
    
        //创建DStream,返回接收到的输入数据
        var stream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(sourcetopic),kafkaParam))
    
    
        //每一个stream都是一个ConsumerRecord
        stream.map(s =>("id:" + s.key(),">>>>:"+s.value())).foreachRDD(rdd => {
          //对于RDD的每一个分区执行一个操作
          rdd.foreachPartition(partitionOfRecords => {
            // kafka连接池。
            val pool = createKafkaProducerPool(brobrokers, targettopic)
            //从连接池里面取出了一个Kafka的连接
            val p = pool.borrowObject()
            //发送当前分区里面每一个数据
            partitionOfRecords.foreach {message => System.out.println(message._2);p.send(message._2,Option(targettopic))}
    
            // 使用完了需要将kafka还回去
            pool.returnObject(p)
    
          })
    
          //更新offset信息
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
    
        })
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    
    

    \sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\PooledKafkaProducerAppFactory.scala[10]

    package com.atguigu.streaming
    import java.util.Properties
    import org.apache.commons.pool2.impl.DefaultPooledObject
    import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    
    /**
      * Created by wuyufei on 06/09/2017.
      */
    
    case class KafkaProducerProxy(brokerList: String,
                                producerConfig: Properties = new Properties,
                                defaultTopic: Option[String] = None,
                                producer: Option[KafkaProducer[String, String]] = None) {
    
      type Key = String
      type Val = String
    
      require(brokerList == null || !brokerList.isEmpty, "Must set broker list")
    
      private val p = producer getOrElse {
    
        var props:Properties= new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
        new KafkaProducer[String,String](props)
      }
    
    
      //把我的消息包装成了ProducerRecord
      private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
        val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
        require(!t.isEmpty, "Topic must not be empty")
        key match {
          case Some(k) => new ProducerRecord(t, k, value)
          case _ => new ProducerRecord(t, value)
        }
      }
    
      def send(key: Key, value: Val, topic: Option[String] = None) {
        //调用KafkaProducer他的send方法发送消息
        p.send(toMessage(value, Option(key), topic))
      }
    
      def send(value: Val, topic: Option[String]) {
        send(null, value, topic)
      }
    
      def send(value: Val, topic: String) {
        send(null, value, Option(topic))
      }
    
      def send(value: Val) {
        send(null, value, None)
      }
    
      def shutdown(): Unit = p.close()
    
    }
    
    
    abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {
    
      def newInstance(): KafkaProducerProxy
    }
    
    class BaseKafkaProducerFactory(brokerList: String,
                                      config: Properties = new Properties,
                                      defaultTopic: Option[String] = None)
      extends KafkaProducerFactory(brokerList, config, defaultTopic) {
    
      override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic)
    
    }
    
    // 继承一个基础的连接池,需要提供池化的对象类型
    class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory)
      extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable {
    
      // 用于池来创建对象
      override def create(): KafkaProducerProxy = factory.newInstance()
    
      // 用于池来包装对象
      override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj)
    
      // 用于池来销毁对象
      override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {
        p.getObject.shutdown()
        super.destroyObject(p)
      }
    
    }
    
    

    \sparkStreaming\sparkstreaming_queueRdd\pom.xml[11]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>sparkStreaming</artifactId>
            <groupId>com.atguigu</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>sparkstreaming_queueRdd</artifactId>
    
        <dependencies>
    
        </dependencies>
    
        <build>
            <finalName>queueRdd</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.atguigu.streaming.QueueRdd</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    \sparkStreaming\sparkstreaming_queueRdd\src\main\scala\com\atguigu\streaming\QueueRdd.scala[12]

    package com.atguigu.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    object QueueRdd {
    
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("QueueRdd")
        val ssc = new StreamingContext(conf, Seconds(1))
    
        // Create the queue through which RDDs can be pushed to
        // a QueueInputDStream
        //创建RDD队列
        val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
    
        // Create the QueueInputDStream and use it do some processing
        // 创建QueueInputDStream
        val inputStream = ssc.queueStream(rddQueue)
    
        //处理队列中的RDD数据
        val mappedStream = inputStream.map(x => (x % 10, 1))
        val reducedStream = mappedStream.reduceByKey(_ + _)
    
        //打印结果
        reducedStream.print()
    
        //启动计算
        ssc.start()
    
        // Create and push some RDDs into
        for (i <- 1 to 30) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
          Thread.sleep(2000)
    
          //通过程序停止StreamingContext的运行
          //ssc.stop()
        }
    
        ssc.awaitTermination()
      }
    }
    

    \sparkStreaming\sparkstreaming_statefulWordCount\pom.xml[13]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>sparkStreaming</artifactId>
            <groupId>com.atguigu</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>sparkstreaming_statefulWordCount</artifactId>
    
        <dependencies>
    
        </dependencies>
    
        <build>
            <finalName>statefulwordcount</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.atguigu.streaming.WorldCount</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    \sparkStreaming\sparkstreaming_statefulWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala[14]

    package com.atguigu.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    /**
      * Created by wuyufei on 06/09/2017.
      */
    object WorldCount {
    
      def main(args: Array[String]) {
        // 需要创建一个SparkConf
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        // 需要创建一个StreamingContext
        val ssc = new StreamingContext(conf, Seconds(3))
        // 需要设置一个checkpoint的目录。
        ssc.checkpoint(".")
    
        // 通过StreamingContext来获取master01机器上9999端口传过来的语句
        val lines = ssc.socketTextStream("master01", 9999)
    
        // 需要通过空格将语句中的单词进行分割DStream[RDD[String]]
        val words = lines.flatMap(_.split(" "))
    
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // 需要将每一个单词都映射成为一个元组(word,1)
        val pairs = words.map(word => (word, 1))
    
    
        // 定义一个更新方法,values是当前批次RDD中相同key的value集合,state是框架提供的上次state的值
    
        val updateFunc = (values: Seq[Int], state: Option[Int]) => {
          // 计算当前批次相同key的单词总数
          val currentCount = values.foldLeft(0)(_ + _)
          // 获取上一次保存的单词总数
          val previousCount = state.getOrElse(0)
          // 返回新的单词总数
          Some(currentCount + previousCount)
        }
    
        // 使用updateStateByKey方法,类型参数是状态的类型,后面传入一个更新方法。
        val stateDstream = pairs.updateStateByKey[Int](updateFunc)
        //输出
        stateDstream.print()
        stateDstream.saveAsTextFiles("hdfs://master01:9000/statful/","abc")
    
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate
        //ssc.stop()
      }
    
    }
    
    

    \sparkStreaming\sparkstreaming_windowWordCount\pom.xml[15]

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>sparkStreaming</artifactId>
            <groupId>com.atguigu</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>sparkstreaming_windowWordCount</artifactId>
    
        <dependencies>
    
        </dependencies>
    
        <build>
            <finalName>statefulwordcount</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.atguigu.streaming.WorldCount</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    \sparkStreaming\sparkstreaming_windowWordCount\src\main\resources\log4j.properties[16]

    #
    # Copyright (c) 2017. WuYufei All rights reserved.
    #
    
    log4j.rootLogger=warn,stdout,R
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n
    
    log4j.appender.R=org.apache.log4j.RollingFileAppender
    log4j.appender.R.File=spark.log
    log4j.appender.R.MaxFileSize=1024KB
    log4j.appender.R.MaxBackupIndex=1
    
    log4j.appender.R.layout=org.apache.log4j.PatternLayout
    log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n
    
    

    \sparkStreaming\sparkstreaming_windowWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala[17]

    package com.atguigu.streaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    /**
      * Created by wuyufei on 06/09/2017.
      */
    object WorldCount {
    
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        // batchInterval = 3s
        val ssc = new StreamingContext(conf, Seconds(2))
        ssc.checkpoint("./checkpoint")
    
        // Create a DStream that will connect to hostname:port, like localhost:9999
        val lines = ssc.socketTextStream("master01", 9000)
    
        // Split each line into words
        val words = lines.flatMap(_.split(" "))
    
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // Count each word in each batch
        val pairs = words.map(word => (word, 1))
    
        //val wordCounts = pairs.reduceByKey((a:Int,b:Int) => (a + b))
    
        // 窗口大小 为12s, 12/3 = 4  滑动步长 6S,   6/3 =2
        //val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))
    
        val wordCounts2 = pairs.reduceByKeyAndWindow(_ + _,_ - _ ,Seconds(12), Seconds(6))
    
        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts2.print()
    
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate
        //ssc.stop()
    
      }
    
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    

    1. \pom.xml

    2. \sparkStreaming\pom.xml

    3. \sparkStreaming\sparkstreaming_customerReceiver\pom.xml

    4. \sparkStreaming\sparkstreaming_customerReceiver\src\main\resources\log4j.properties

    5. \sparkStreaming\sparkstreaming_customerReceiver\src\main\scala\com\atguigu\streaming\CustomReceiver.scala

    6. \sparkStreaming\sparkstreaming_helloworld\pom.xml

    7. \sparkStreaming\sparkstreaming_helloworld\src\main\scala\com\atguigu\streaming\WorldCount.scala

    8. \sparkStreaming\sparkstreaming_kafka\pom.xml

    9. \sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\KafkaStreaming.scala

    10. \sparkStreaming\sparkstreaming_kafka\src\main\scala\com\atguigu\streaming\PooledKafkaProducerAppFactory.scala

    11. \sparkStreaming\sparkstreaming_queueRdd\pom.xml

    12. \sparkStreaming\sparkstreaming_queueRdd\src\main\scala\com\atguigu\streaming\QueueRdd.scala

    13. \sparkStreaming\sparkstreaming_statefulWordCount\pom.xml

    14. \sparkStreaming\sparkstreaming_statefulWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala

    15. \sparkStreaming\sparkstreaming_windowWordCount\pom.xml

    16. \sparkStreaming\sparkstreaming_windowWordCount\src\main\resources\log4j.properties

    17. \sparkStreaming\sparkstreaming_windowWordCount\src\main\scala\com\atguigu\streaming\WorldCount.scala

    相关文章

      网友评论

          本文标题:spark Streaming代码

          本文链接:https://www.haomeiwen.com/subject/vmjzjctx.html