Kafka从0.8版本到0.10版本提供了一种新的消费者api,所以根据你的kafka版本不同相应的有2种包可用,分别是spark-streaming-kafka-0-8 和spark-streaming-kafka-0-8,前者可兼容kafka 0.8及其以上版本,后者只能兼容0.10及其以上的版本,由于本篇文件基于Spark 2.2.0、Kafka 0.9.0,那么集成包的选择应该是选择前者,也就是spark-streaming-kafka-0-8 ,两个集成包的区别如下(图片截自官网):
使用spark-streaming-kafka-0-8,有两种方式可以使用Spark Streaming从Kafka接收数据,第一种是Receiver-based方式,基于Receiver使用Kafka高级API,第二种是Direct 方式(Since Spark 1.3),没有Receiver
-
Receiver-based
这种方式使用Receiver 接收数据,接收到的数据会保存在Spark executors里,启动Spark Streaming作业的时候再处理这些数据,这种方式很容易造成数据丢失,为了确保没有数据丢失,需要在Spark Streaming里开启Write Ahead Logs(从Spark 1.2引进)模式,数据处理前先把数据存储在(可以说是备份)分布式文件系统里(比如HDFS),这样在处理失败数据丢失的时候可以从文件系统恢复数据。
Receiver-Based
- 引入pom依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
- Spark Streaming程序代码
if(args.length!=4){
System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum,group,topics,numThreads) = args
val sparkConf = new SparkConf()//.setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap) message.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
Receiver这种方式需要注意三点:
- Spark Streaming中的partition和Kafka中的partition是没有关联的,如果增加每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题,并没有增加Spark处理数据的并行度
- 不同的Group和Topic我们可以使用多个Receiver创建多个DStream并行接收数据
- 如果启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER
- 打包到生产环境运行
因为Spark Streaming集成Kafka的jar包并未打到程序包里,所以spark-submit启动的时候需要通过--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0添加该jar包,第一次会先去下载jar包,速度会稍慢,第二次就可以直接使用了,详细命令如下:
spark-submit \
--class com.yxzc.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test kafka_streaming_topic 1
如果生产环境不能连接外网,或者网速很差时,可以先从maven仓库下载该jar包,然后在spark-submit是通过--jars指定该jar,个人比较推荐这种方式
-
Direct
spark1.3开始,引入了Direct方式,这种方式没有Receiver,它会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch,使用的是Kafka简单API
Direct Approach
这种方式相比Receiver方式,有以下优势:
- 简化的并行
在Receiver方式中我们需要创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。 - 在Receiver方式中为了保证数据的零丢失,需要开启Write Ahead Log模式,也就是需要把数据保存一份到类似HDFS一样的分布式文件系统中用来做数据恢复,这样Kafka里有一份数据,HDFS有一份数据,数据存了2份,对磁盘空间资源是一种浪费,Direct方式,不存在这个问题,只要Kafka中的数据保留的时间足够长,都能做到从kafka进行数据恢复
- 精确的一次处理
在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。Receiver方式是从Zookeeper中读取offset值,也就是Zookeeper保存当前消费的offset值,那么如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,我们是直接从kafka来读数据,那么offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录
- 引入pom依赖 同Receiver方式一样
- Spark Streaming程序代码
if(args.length!=2){
System.err.println("Usage KafkaDirectWordCount <brokers> <topics> ")
System.exit(1)
}
val Array(brokers,topics) = args
val sparkConf = new SparkConf()//.setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val topicsSet = topics.split(",").toSet
val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicsSet)
// message.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
- 打包到生产环境运行 同Receiver方式一样
网友评论