美文网首页
Spark-自定义Receiver

Spark-自定义Receiver

作者: 布莱安托 | 来源:发表于2020-07-06 20:00 被阅读0次

    可以继承Receiver类来实现自定义采集器,需要实现方法如下:

    • onStart:接收器启动方法
    • onStop:接收器停止方法

    同时还要指定Receiver的缓存等级

    1. 代码实现

      class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
      
        var socket: Socket = _
      
        override def onStart(): Unit = {
          new Thread(new Runnable {
            override def run(): Unit = {
              receive()
            }
          }).start()
        }
      
        override def onStop(): Unit = {
          if(socket != null) {
            socket.close()
            socket = null
          }
        }
      
        def receive() {
          socket = new Socket(host, port)
          val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
          var line: String = null
          while ((line = reader.readLine()) != null) {
            this.store(line)
          }
        }
      
      }
      
    2. 使用自定义接收器

      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      
      object MyReceiverDemo {
        def main(args: Array[String]): Unit = {
      
          val conf = new SparkConf().setMaster("local[4]").setAppName("MyReceiverDemo")
          val streamingContext = new StreamingContext(conf, Seconds(5))
      
          // 使用StreamingContext对象的receiverStream方法,指定自定义Receiver接收数据
          val receiverDStream = streamingContext.receiverStream(new MyReceiver("192.168.0.100", 9999))
      
          val flatMapDStream = receiverDStream.flatMap(_.split(" "))
      
          val mapDStream = flatMapDStream.map((_, 1))
      
          val reduceByKeyDStream = mapDStream.reduceByKey(_ + _)
      
          reduceByKeyDStream.print()
      
          streamingContext.start()
          streamingContext.awaitTermination()
        }
      }
      

    相关文章

      网友评论

          本文标题:Spark-自定义Receiver

          本文链接:https://www.haomeiwen.com/subject/pabgqktx.html