前几章介绍了Kafka、Spark Streaming入门、Spark Streaming进阶。在这一章一起学习Spark Streaming和Kafka的整合。
概述
kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时计算框架实时地读取kafka中的数据然后进行计算。在spark1.3版本后,kafkaUtils里面提供了两个创建dstream的方法,一种为KafkaUtils.createDstream(需要receiver接收),另一种为KafkaUtils.createDirectStream。其中推荐使用KafkaUtils.createDirectStream的方式相比基于Receiver方式有几个优点:
- 简化并行
不需要创建多个kafka输入流,然后union它们,sparkStreaming将会创建和kafka分区一种的rdd的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。 - 高效
第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是被kafka复制,另一次是写到WAL中。而没有receiver的这种方式消除了这个问题。 - 恰好一次语义(Exactly-once-semantics)
Receiver读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,虽然这种方法可以通过数据保存在WAL中保证数据不丢失,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具。 - 版本限制
除了以上的原因,由于在学习Kafka时安装的版本是2.2.0,查询官方文档Spark Streaming整合Kafka在0.10已经不支持Receiver的方式。
image.png
综上我们只演示KafkaUtils.createDirectStream的方式进行整合。
整合流程
- 启动zookeeper集群
zkServer.sh start
-
启动kafka集群
在启动之前在server.properties中根据虚拟机地址配置listeners的地址
image.png
因为不配置在启动整合代码时报Broker may not be available的错误,通过百度后指定listeners的地址即可。
启动kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
- 创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
//查看创建的topic,有记录说明创建成功
kafka-topics.sh --list --zookeeper localhost:2181
- 启动生成者,向topic中生产数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_spark
- 编写SparkStreaming应用程序
- pom依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.3</version>
</dependency>
这里spark-streaming的版本我选择的是spark-streaming_2.12:2.42版本,这是由于我本地用的Scala的环境是2.12.8,spark-streaming这个版本中用到的Scala版本就是2.12.8。之前我使用的是spark-streaming_2.12:2.11.8版本,项目启动时报环境不匹配的问题。所以在本地演示时需要选择合适的版本。
- Scala代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* Spark Streaming整合Kafka
*
* @author zhiying.dong@hand-china.com 2019/05/24 16:54
*/
object KafkaDirectWordCount{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("DirectKafka")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val topicsSet = Array("kafka_spark")
val kafkaParams = mutable.HashMap[String, String]()
//必须添加以下参数,否则会报错
kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
kafkaParams.put("group.id", "group1")
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
)
)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
- 本地测试
在生产者中输入统计字符
观察控制台发现可以统计字符出现的此时,说明Spark Streaming可以消费到Kafka中生产的消息
image.png
- 服务器测试
和之前一样,打包上传到服务器,在通过以下命令启动
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.3 --class com.imooc.spark.Test ~/lib/sparktrain-1.0.jar
网友评论