Spark Streaming整合Flume

作者: 董二弯 | 来源:发表于2019-05-23 16:20 被阅读1次

前几章介绍了FlumeSpark Streaming入门Spark Streaming进阶。这一章一起学习Spark Streaming整合Flume。

概述

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。Spark Streaming对接Flume有两种方式,一种是Flume将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Pull拉取数据。在实际中用Pull方式较多,因为其中使用到的SparkSink具有缓冲区的作用,SparkStreaming通过有效的flume receiver去从Sink中拉取数据,拉取的数据有多副本的存储方式,增加了容错性,稳定性好并且在拉取过程中具有事务性。而Push方式可能出现数据丢失的问题。所以这里只演示Pull的方式。

整合步骤

  • 配置flume agent
    监听 /root/data/data.log文件,在文件中有新的输入时,把输入的内容输出到SparkSink。同时SparkSink绑定的IP为虚拟机本地的IP,端口只要是未占用的都行。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command= tail -F /root/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.30.131
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 启动flume agent
../bin/flume-ng agent -n a1 -c conf/ -f exec-memory-spark.conf -Dflume.root.logger=INFO,console
  • 本地Scala 程序
    pom文件
 <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
 </dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.2.0</version>
 </dependency>
<dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.5</version>
</dependency>

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming整合Flume的Pull方式
  */
object FlumePullWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2) {
      System.err.println("Usage: FlumePullWordCount <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    //获取flume中数据
    val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)

    flumeStream.map(x=> new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 本地测试
    在程序中监听的IP地址和端口是通过外部传参的方式处理,这么做方便在服务器部署时指定任意监听的位置,避免了硬编码的局限。
    在本地测试时需要在如图位置指定参数。这里的IP和端口一定要和flume agent配置中的相同。


    image.png

    启动程序后,在data.log中输入字符。


    image.png
    观察控制台信息,发现可以统计出字符出现的次数。
    image.png
  • 服务器测试
    在实际开发中,程序开发完毕时需要部署在服务器运行。此时只需要把程序打成jar包,在根据spark-submit的方式运行即可。

  • 首先通过idea跳过测试打包


    image.png
  • 把jar包上传到服务器

  • spark-submit启动作业
    要在有网的环境下,--packages需要拉取spark streaming和flume集成的包。

./spark-submit --master local[2] --name FlumePullWordCount --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 --class com.imooc.spark.FlumePullWordCount ~/lib/sparktrain-1.0.jar 192.168.30.131 8888
  • 测试
    在flume监听的输入文件中输入字符


    image.png

    观察服务器上运行作业的日志


    image.png
    成功

相关文章

网友评论

    本文标题:Spark Streaming整合Flume

    本文链接:https://www.haomeiwen.com/subject/epzmzqtx.html