拥有发布-订阅能力的Message queueing systems 通常被用来处理messages 。 传统消息队列处理不了large-scale data processing applications所需要的每秒处理巨大数量的messages需求。
Kafka is a publish-subscribe messaging system used by many IoT applications to process a huge number of messages.
它的特点:
- Extremely fast: Kafka can process huge amounts of data by handling reading and writing in short intervals of time from many application clients
- Highly scalable: Kafka is designed to scale up and scale out to form a cluster using commodity hardware(通用硬件)
- Persists a huge number of messages: Messages reaching Kafka topics are persisted into the secondary storage, while at the same time it is handling huge number of messages flowing through
Kafka中的3个重要elements:
- Producer: The real source of the messages, such as weather sensors or mobile phone network
- Broker: The Kafka cluster(就是Kafka集群), which receives and persists the messages published to its topics by various producers
- Consumer: The data processing applications subscribed to the Kafka topics that consume the messages published to the topics
1. 工程结构
image.png1.1 sbt文件
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.4"
//libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.4.4" % "provided"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.4.4" % "provided"
//libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.4.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
//"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion
)
image.png
1.2 用于提交的submit.sh脚本
#!/bin/bash
#-----------
# submit.sh
#-----------
# IMPORTANT - Assumption is that the $SPARK_HOME and $KAFKA_HOME environment variables are already set in the system that is running the application
# [FILLUP] Which is your Spark master. If monitoring is needed, use the desired Spark master or use local
# When using the local mode. It is important to give more than one cores in square brackets
#SPARK_MASTER=spark://Rajanarayanans-MacBook-Pro.local:7077
#SPARK_MASTER=local[4]
SPARK_MASTER=spark://yay-ThinkPad-T470-W10DG:7077
# [OPTIONAL] Your Scala version
SCALA_VERSION="2.11"
# [OPTIONAL] Name of the application jar file. You should be OK to leave it like that
APP_JAR="simple-project_2.11-1.0.jar"
# [OPTIONAL] Absolute path to the application jar file
PATH_TO_APP_JAR="target/scala-$SCALA_VERSION/$APP_JAR"
# [OPTIONAL] Spark submit command
SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit"
# [OPTIONAL] Pass the application name to run as the parameter to this script
APP_TO_RUN=$1
sbt package
if [ $2 -eq 1 ]
then
$SPARK_SUBMIT --class $APP_TO_RUN --master $SPARK_MASTER --jars $KAFKA_HOME/libs/kafka-clients-2.1.1.jar,$KAFKA_HOME/libs/metrics-core-2.2.0.jar,$KAFKA_HOME/libs/zkclient-0.11.jar,./lib/kafka_2.11-0.8.2.1.jar,./lib/spark-streaming-kafka-0-8_2.11-2.4.4.jar $PATH_TO_APP_JAR
else
$SPARK_SUBMIT --class $APP_TO_RUN --master $SPARK_MASTER --jars $PATH_TO_APP_JAR $PATH_TO_APP_JAR
fi
image.png
1.3 源代码
KafkaStreamingApps.scala
/**
The following program can be compiled and run using SBT
Wrapper scripts have been provided with this
The following script can be run to compile the code
./compile.sh
The following script can be used to run this application in Spark. The second command line argument of value 1 is very important. This is to flag the shipping of the kafka jar files to the Spark cluster
./submit.sh com.packtpub.sfb.KafkaStreamingApps 1
**/
package com.packtpub.sfb
import java.util.HashMap
import org.apache.spark.streaming._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.kafka._
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,ProducerRecord}
import org.apache.log4j.{Level, Logger}
object KafkaStreamingApps {
def main(args: Array[String]) {
// Log level settings
LogSettings.setLogLevels()
// Variables used for creating the Kafka stream
//The quorum of Zookeeper hosts
val zooKeeperQuorum = "localhost"
// Message group name
val messageGroup = "sfb-consumer-group"
//Kafka topics list separated by coma if there are multiple topics to be listened on
val topics = "sfb"
//Number of threads per topic
val numThreads = 1
// Create the Spark Session and the spark context
val spark = SparkSession.builder.appName(getClass.getSimpleName).getOrCreate()
// Get the Spark context from the Spark session for creating the streaming context
val sc = spark.sparkContext
// Create the streaming context
val ssc = new StreamingContext(sc, Seconds(10))
// Set the check point directory for saving the data to recover when there is a crash
ssc.checkpoint("/tmp")
// Create the map of topic names
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Create the Kafka stream
val appLogLines = KafkaUtils.createStream(ssc, zooKeeperQuorum, messageGroup, topicMap).map(_._2)
// Count each log messge line containing the word ERROR
val errorLines = appLogLines.filter(line => line.contains("ERROR"))
// Print the line containing the error
errorLines.print()
// Count the number of messages by the windows and print them
errorLines.countByWindow(Seconds(30), Seconds(10)).print()
// Start the streaming
ssc.start()
// Wait till the application is terminated
ssc.awaitTermination()
}
}
object LogSettings{
/**
Necessary log4j logging level settings are done
*/
def setLogLevels() {
val log4jInitialized =
Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// This is to make sure that the console is clean from other INFO messages printed by Spark
Logger.getRootLogger.setLevel(Level.ERROR)
}
}
}
2. 编译
image.png3. 提交运行
3.1. Kafka和Zookeeper环境准备
3.1.1. 安装Kafka和Zookeeper
本空间有安装指导,这里不再重复
3.1..2. 启动Zookeeper和Kafka Server
$ZOOKEEPER_HOME/bin/zkServer.sh start
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
3.2.启动Spark程序
$SPARK_HOME/sbin/start-all.sh
或者分开启动:
$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
3.3. 创建消息流
3.3.1. 创建主题和查看主题
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sfb
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
3.4. 提交
3.4.1.建立文件夹并下载需要的包
3.4.2. 创建Producer
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sfb
3.4.3 在producer里面输入内容
image.png[Fri Dec 20 01:46:23 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:46:23 2015] [WARN] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:54:34 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
然后观察我们的程序:
image.png
完整工程下载路径(包括需要的lib目录):https://github.com/yinbodotcc/spark-stream-Kafka-0.8.git
网友评论