美文网首页
Flume Pull方式整合Spark-Streaming

Flume Pull方式整合Spark-Streaming

作者: zoran_af7d | 来源:发表于2020-06-07 10:49 被阅读0次

    Approach: Pull-based Approach using a Custom Sink

    Flume的sink不直接连接Spark组件,而是存到一个Customer sink中存在buffer中
    Spark Streaming进行分批次拉取数据。每一次操作只有当数据到达并且以副本的形式复制成功以后才算成功,因此该方式提高了容错性。
    Flume配置文件 flume_pull_streaming.conf

    simple-agent.sources = netcat-source
    simple-agent.sinks = spark-sink
    simple-agent.channels = memory-channel
    
    simple-agent.sources.netcat-source.type = netcat
    simple-agent.sources.netcat-source.bind = hadoop000
    simple-agent.sources.netcat-source.port = 44444
    
    simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
    simple-agent.sinks.spark-sink.hostname = hadoop000
    simple-agent.sinks.spark-sink.port = 41414
    
    simple-agent.channels.memory-channel.type = memory
    
    simple-agent.sources.netcat-source.channels = memory-channel
    simple-agent.sinks.spark-sink.channel = memory-channel
    

    该配置中指定了agent的sink类型为org.apache.spark.streaming.flume.sink.SparkSink
    并且指定了该sink对应的地址和端口

    SparkStreaming 端代码:

    object flumePull {
      def main(args: Array[String]): Unit = {
        if(args.length != 2){
          System.err.println("Usage:flumePull <hostname> <port>")
          System.exit(1)
        }
        val conf: SparkConf = new SparkConf().setAppName("flumePull").setMaster("local[*]")
        val ssc = new StreamingContext(conf,Seconds(3))
    
        val flumeEvent: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,args(0),args(1).toInt)
        //将SparkFlumeEvent转换为String
        val lines: DStream[String] = flumeEvent.map(fe => new String(fe.event.getBody.array()).trim)
        val res: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        res.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    其中指定的地址和端口号是SparkSink对应的地址和端口号

    相关文章

      网友评论

          本文标题:Flume Pull方式整合Spark-Streaming

          本文链接:https://www.haomeiwen.com/subject/tbinzhtx.html