美文网首页
Flume Push方式整合Spark-Streaming

Flume Push方式整合Spark-Streaming

作者: zoran_af7d | 来源:发表于2020-06-07 10:50 被阅读0次

    Approach : Flume-style Push-based Approach

    Flume 可以使用push的方式来整合spark-streaming
    主要步骤为:

    创建flume配置文件(eg:flume_push_streaming.conf)
    对source、sink、channel进行配置

    simple-agent.sources = netcat-source
    simple-agent.sinks = avro-sink
    simple-agent.channels = memory-channel
    
    simple-agent.sources.netcat-source.type = netcat
    simple-agent.sources.netcat-source.bind = hadoop000
    simple-agent.sources.netcat-source.port = 44444
    
    simple-agent.sinks.avro-sink.type = avro
    simple-agent.sinks.avro-sink.hostname = 192.168.31.209
    simple-agent.sinks.avro-sink.port = 41414
    
    simple-agent.channels.memory-channel.type = memory
    
    simple-agent.sources.netcat-source.channels = memory-channel
    simple-agent.sinks.avro-sink.channel = memory-channe
    

    这里选择netcat作为source,并且指定了sink的地址和端口号
    这里的sink就是Spark集群的一个receiver,用来接收flume的Avro 类型数据
    并且该端口要在flume程序启动之前启动起来以供绑定

    相关spark代码:

    object flumePush {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("flumePush").setMaster("local[*]")
        val ssc = new StreamingContext(conf,Seconds(3))
    
        val flumeEvent: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"192.168.31.209",41414)
        //将SparkFlumeEvent转换为String
        val lines: DStream[String] = flumeEvent.map(fe => new String(fe.event.getBody.array()))
        val res: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        res.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    其中指定的端口号是用来监听Flume sink
    spark接收到的flume发送的数据是SparkFlumeEvent类型的,其中包括了header和body,需要做转换取出body中的数据再进行操作

    相关文章

      网友评论

          本文标题:Flume Push方式整合Spark-Streaming

          本文链接:https://www.haomeiwen.com/subject/sgggzhtx.html