作者: 烈格黑街 | 来源:发表于2019-08-15 14:45


    # Copyright (c) 2017. WuYufei All rights reserved.
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n
    log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n


    package com.atguigu.streaming
    import java.io.{BufferedReader, InputStreamReader}
    import java.net.Socket
    import java.nio.charset.StandardCharsets
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.receiver.Receiver
      * Created by wuyufei on 06/09/2017.
    // String就是接收數據的類型
    class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
      override def onStart(): Unit = {
        // Start the thread that receives data over a connection
        new Thread("Socket Receiver") {
          override def run() { receive() }
      override def onStop(): Unit = {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false
      /** Create a socket connection and receive data until receiver is stopped */
      private def receive() {
        var socket: Socket = null
        var userInput: String = null
        try {
          // Connect to host:port
          socket = new Socket(host, port)
          // Until stopped or connection broken continue reading
          val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
          userInput = reader.readLine()
          while(!isStopped && userInput != null) {
            // 內部的函數,將數據存儲下倆
            userInput = reader.readLine()
          // Restart in an attempt to connect again when server is active again
          restart("Trying to connect again")
        } catch {
          case e: java.net.ConnectException =>
            // restart if could not connect to server
            restart("Error connecting to " + host + ":" + port, e)
          case t: Throwable =>
            // restart if there is any other error
            restart("Error receiving data", t)
    object CustomReceiver {
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(1))
        // Create a DStream that will connect to hostname:port, like localhost:9999
        val lines = ssc.receiverStream(new CustomReceiver("master01", 9999))
        // Split each line into words
        val words = lines.flatMap(_.split(" "))
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // Count each word in each batch
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
        // Print the first ten elements of each RDD generated in this DStream to the console
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate


    package com.atguigu.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
      * Created by wuyufei on 06/09/2017.
    object WorldCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(1))
        // Create a DStream that will connect to hostname:port, like localhost:9999
        val lines = ssc.socketTextStream("zk1", 9999)
        // Split each line into words
        val words = lines.flatMap(_.split(" "))
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // Count each word in each batch
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
        // Print the first ten elements of each RDD generated in this DStream to the console
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate


            <!-- 提供对象连接池的一种方式 -->
            <!-- 用来连接Kafka的工具类 -->


    package com.atguigu.streaming
    import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.api.java.function.VoidFunction
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
      * Created by wuyufei on 06/09/2017.
    object createKafkaProducerPool{
      def apply(brokerList: String, topic: String):  GenericObjectPool[KafkaProducerProxy] = {
        val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))
        val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)
        val poolConfig = {
          val c = new GenericObjectPoolConfig
          val maxNumProducers = 10
        new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)
    object KafkaStreaming{
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
        val ssc = new StreamingContext(conf, Seconds(1))
        val brobrokers = ",,"
        val sourcetopic="source1";
        val targettopic="target1";
        var group="con-consumer-group"
        val kafkaParam = Map(
          "bootstrap.servers" -> brobrokers,//用于初始化链接到集群的地址
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> group,
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean),
        var stream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(sourcetopic),kafkaParam))
        stream.map(s =>("id:" + s.key(),">>>>:"+s.value())).foreachRDD(rdd => {
          rdd.foreachPartition(partitionOfRecords => {
            // kafka连接池。
            val pool = createKafkaProducerPool(brobrokers, targettopic)
            val p = pool.borrowObject()
            partitionOfRecords.foreach {message => System.out.println(message._2);p.send(message._2,Option(targettopic))}
            // 使用完了需要将kafka还回去
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges


    package com.atguigu.streaming
    import java.util.Properties
    import org.apache.commons.pool2.impl.DefaultPooledObject
    import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
      * Created by wuyufei on 06/09/2017.
    case class KafkaProducerProxy(brokerList: String,
                                producerConfig: Properties = new Properties,
                                defaultTopic: Option[String] = None,
                                producer: Option[KafkaProducer[String, String]] = None) {
      type Key = String
      type Val = String
      require(brokerList == null || !brokerList.isEmpty, "Must set broker list")
      private val p = producer getOrElse {
        var props:Properties= new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        new KafkaProducer[String,String](props)
      private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
        val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
        require(!t.isEmpty, "Topic must not be empty")
        key match {
          case Some(k) => new ProducerRecord(t, k, value)
          case _ => new ProducerRecord(t, value)
      def send(key: Key, value: Val, topic: Option[String] = None) {
        p.send(toMessage(value, Option(key), topic))
      def send(value: Val, topic: Option[String]) {
        send(null, value, topic)
      def send(value: Val, topic: String) {
        send(null, value, Option(topic))
      def send(value: Val) {
        send(null, value, None)
      def shutdown(): Unit = p.close()
    abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {
      def newInstance(): KafkaProducerProxy
    class BaseKafkaProducerFactory(brokerList: String,
                                      config: Properties = new Properties,
                                      defaultTopic: Option[String] = None)
      extends KafkaProducerFactory(brokerList, config, defaultTopic) {
      override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic)
    // 继承一个基础的连接池,需要提供池化的对象类型
    class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory)
      extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable {
      // 用于池来创建对象
      override def create(): KafkaProducerProxy = factory.newInstance()
      // 用于池来包装对象
      override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj)
      // 用于池来销毁对象
      override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {


    package com.atguigu.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scala.collection.mutable
    object QueueRdd {
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local[*]").setAppName("QueueRdd")
        val ssc = new StreamingContext(conf, Seconds(1))
        // Create the queue through which RDDs can be pushed to
        // a QueueInputDStream
        val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
        // Create the QueueInputDStream and use it do some processing
        // 创建QueueInputDStream
        val inputStream = ssc.queueStream(rddQueue)
        val mappedStream = inputStream.map(x => (x % 10, 1))
        val reducedStream = mappedStream.reduceByKey(_ + _)
        // Create and push some RDDs into
        for (i <- 1 to 30) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)


    package com.atguigu.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
      * Created by wuyufei on 06/09/2017.
    object WorldCount {
      def main(args: Array[String]) {
        // 需要创建一个SparkConf
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        // 需要创建一个StreamingContext
        val ssc = new StreamingContext(conf, Seconds(3))
        // 需要设置一个checkpoint的目录。
        // 通过StreamingContext来获取master01机器上9999端口传过来的语句
        val lines = ssc.socketTextStream("master01", 9999)
        // 需要通过空格将语句中的单词进行分割DStream[RDD[String]]
        val words = lines.flatMap(_.split(" "))
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // 需要将每一个单词都映射成为一个元组(word,1)
        val pairs = words.map(word => (word, 1))
        // 定义一个更新方法,values是当前批次RDD中相同key的value集合,state是框架提供的上次state的值
        val updateFunc = (values: Seq[Int], state: Option[Int]) => {
          // 计算当前批次相同key的单词总数
          val currentCount = values.foldLeft(0)(_ + _)
          // 获取上一次保存的单词总数
          val previousCount = state.getOrElse(0)
          // 返回新的单词总数
          Some(currentCount + previousCount)
        // 使用updateStateByKey方法,类型参数是状态的类型,后面传入一个更新方法。
        val stateDstream = pairs.updateStateByKey[Int](updateFunc)
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate


    # Copyright (c) 2017. WuYufei All rights reserved.
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n
    log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%t]  %-c(line:%L) : %m%n


    package com.atguigu.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
      * Created by wuyufei on 06/09/2017.
    object WorldCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
        // batchInterval = 3s
        val ssc = new StreamingContext(conf, Seconds(2))
        // Create a DStream that will connect to hostname:port, like localhost:9999
        val lines = ssc.socketTextStream("master01", 9000)
        // Split each line into words
        val words = lines.flatMap(_.split(" "))
        //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
        // Count each word in each batch
        val pairs = words.map(word => (word, 1))
        //val wordCounts = pairs.reduceByKey((a:Int,b:Int) => (a + b))
        // 窗口大小 为12s, 12/3 = 4  滑动步长 6S,   6/3 =2
        //val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))
        val wordCounts2 = pairs.reduceByKeyAndWindow(_ + _,_ - _ ,Seconds(12), Seconds(6))
        // Print the first ten elements of each RDD generated in this DStream to the console
        ssc.start()             // Start the computation
        ssc.awaitTermination()  // Wait for the computation to terminate

