1. 准备工作
Kafka集群的搭建可以参考 Kafka集群搭建与配置
Spark集群的搭建可以参考Hadoop+HBase+Spark+Hive环境搭建
2. 编写代码(scala实现)
引入pom依赖
<properties>
<kafka.version>2.0.0</kafka.version>
<spark.version>2.3.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze-viz_2.11</artifactId>
<version>0.13.2</version>
</dependency>
</dependencies>
生产者,每秒发送数据"yang yun ni hao sha a"
package com.whut.demo
import java.util
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object KafkaProducer {
def main(args: Array[String]) {
//设置代理节点和主题
val brokers = "192.168.1.41:9092,192.168.1.42:9092,192.168.1.47:9092" //zookeeper代理节点
val inputTopic = "input-streaming-topic" //topic
//设置zookeeper连接属性
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//初始化生产者
val producer = new KafkaProducer[String, String](props)
//发送消息
while(true) {
val key = null
val value = "yang yun ni hao sha a"
val message = new ProducerRecord[String,String](inputTopic, key, value)
producer.send(message)
println(message)
Thread.sleep(1000)
}
}
}
Spark Streaming作消费者,每2秒统计一次最近10秒每个单词出现的次数
package com.whut.demo
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object SparkConsumer{
def main(args:Array[String]){
/**
* 设置spark master
* 单机模式: "local[*]"
* 集群模式: "spark://192.168.1.32:7077"
*/
val master = "local[*]"
/**
* 设置checkpoint路径
* 单机模式: "checkpoint"
* 集群模式: "hdfs://master:9000/user/checkpoint"
*/
val checkpoint = "checkpoint"
//设置日志等级
LogConfig.setStreamingLogLevels()
//设置批处理间隔,单位秒
val batchDuration = 1
//设置输入流的topic
val inputTopic = "input-streaming-topic"
//设置输出流的topic
val outputTopic = "output-streaming-topic"
//初始化streamingContext
val streamingContext = new StreamingContext(
new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster(master),
Seconds(batchDuration)
)
streamingContext.checkpoint(checkpoint)
//kafka配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.1.41:9092,192.168.1.42:9092,192.168.1.47:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//创建DStream
val dStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](Array(inputTopic), kafkaParams)
)
//对接收到的一个DStream进行解析
val lines = dStream.map(record => (record.key, record.value))
val words = lines.map(_._2)
val word = words.flatMap(_.split(" "))
val pair = word.map(x => (x,1))
//窗口长度设置为10秒,窗口滑动距离设置为2秒
val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Seconds(10), Seconds(2))
wordCounts.print
streamingContext.start
streamingContext.awaitTermination
}
}
为了使得输出更加简洁,我们还要设置一下日志等级
package com.whut.demo
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
object LogConfig extends Logging {
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)//警告以上级别才打印
}
}
}
3. 集群模式下流处理任务的提交
单机模式
直接在IDEA中运行两个程序即可
集群模式
KafkaProducer 可以直接在IDEA中运行,但SparkConsumer需要将打成jar包,然后用spark-submit提交任务
Idea下,将程序打成jar包(参考IDEA 打jar,提交spark集群运行)
将Jar包上传到HDFS(下面是可能会用到的hdfs命令)
hdfs dfs -ls /
hdfs dfs -rm /SparkConsumer.jar
hdfs dfs -put SparkConsumer.jar /
spark提交任务
spark-submit --class com.whut.demo.SparkConsumer --master spark://master:7077 hdfs://master:9000/SparkConsumer.jar
网友评论