前几天简单了解了Hadoop(HDFS,MR,YRAN)之后,进一步了解一下现在使用比较多的Spark生态--Sprak Streaming。
简介
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
Spark Streaming 是基于核心Spark API扩展的一个可扩展,高吞吐量,容错处理的实时流处理框架。

其核心思想是 将接受的数据流进行数据分批处理,然后经过Spark 引擎处理以数据分片方式输出到结果流。
核心概念
StreamingContext
要初始化 Spark Streaming 程序,必须创建一个 StreamingContext 对象,它是所有 Spark Streaming 功能的主要入口点。
Discretized Streams (DStreams)
Discretized Stream 或 DStream 是 Spark Streaming 提供的基本抽象。 它表示一个连续的数据流,可以是从源接收到的输入数据流,也可以是通过转换输入流生成的处理后的数据流。 在内部,DStream 由一系列连续的 RDD 表示,这是 Spark 对不可变的分布式数据集的抽象。 DStream 中的每个 RDD 都包含来自某个区间的数据,如下图所示。

对 DStream 应用的任何操作都会转换为对底层 RDD 的操作。对行 DStream 中的每个 RDD 应用 flatMap 操作以生成单词 DStream 的 RDD。 这如下图所示。

这块就比较像Spring5的 WebFlux里中的 FlatMap操作。
Input DStreams and Receivers
Input DStreams是DStreams 从流源接收的输入数据流的代表。
每个输入 DStream(除了文件流,本节稍后讨论)都与一个接相关联的Receiver。
词频统计代码
搭建Kafka
可以参考我另外一篇云服务器安装Kafka集群
本地生产者测试代码
为了测试方便,我使用SpringBoot 去搭建一个Kafka生产者的demo
工程目录如下

pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demok</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demok</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
YML 配置文件
spring:
kafka:
bootstrap-servers: xxx.xxx.xxx.xxx:9092 #你的KAFKA地址
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
KafkaProducer
package com.example.demok.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* description: KafkaProducer <br>
* date: 2021/10/22 9:32 <br>
* author: Neal <br>
* version: 1.0 <br>
*/
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
//自定义topic
public static final String TOPIC_TEST = "test";
public void send(String str) {
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, str);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
System.out.println(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
测试类 DemokApplicationTests
package com.example.demok;
import com.example.demok.producer.KafkaProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemokApplicationTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
void contextLoads() {
//声明三条消息
String[] arrStr = {"Deer,Bear,River","Car,Car,River","Deer,Car,Bear"};
//循环向Kafka发送消息
for (int i = 0; i < 3; i++) {
kafkaProducer.send(arrStr[i]);
}
}
}
生产者代码样例很简单,就是像上一篇文章一样,将3行数据写入数组中,循环发送到Kafka的test
Topic 中。
Spark Streaming 词频统计代码
词频统计代码工程目录很简单 就一个类 我就不贴图了直接贴代码,我使用的是Scala 2.11.8 版本
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafkaSparkStream</artifactId>
<version>1.0-SNAPSHOT</version>
<name>kafkaSparkStream</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>3.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
统计代码
package org.example.ss
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/**
* description: App <br>
* date: 2021/10/23 14:47 <br>
* author: Neal <br>
* version: 1.0 <br>
*/
object App {
def main(args: Array[String]): Unit = {
//Spark配置
val sparkConfig = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
//初始化 Stream上下文 10秒计算一次
val ssc = new StreamingContext(sparkConfig,Seconds(10))
//Kafka 参数配置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "xxx.xxx.xxxx.xxx:9092", //kafka地址,集群可以以逗号分隔
"key.deserializer" -> classOf[StringDeserializer], //序列化类
"value.deserializer" -> classOf[StringDeserializer], //序列化类
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest", //消费最新的消息
"enable.auto.commit" -> (false: java.lang.Boolean) //不自动提交
)
//设置消费主题
val topics = Array("test")
//初始化 连接Kafka 获取到InputStream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream
//取出消息,逗号分隔后,将每一个值放入Map中 key 为值,value 为1
.flatMap(_.value().split(",").map((_,1)))
//使用key 值进行计算
.reduceByKey(_ + _).print()
//启动 SparkStreaming
ssc.start()
ssc.awaitTermination()
}
}
这里由于Spark 是Scala语言写的,因此如果想理解里面函数以及对应的参数符号含义,建议去简单看一下Scala 的相关教程入门一下就可以看懂了。
测试以及结果
启动Spark Streaming 词频统计程序
启动后可以看到如下输出

该输出会10秒一刷新,因为代码中配置的是每10S计算一次
启动生产者测试代码,向Kafka发送数据

可以看到生产者发送了3条消息到Kafka
查看计算结果
我们回到频次统计程序的输出页面,可以看到其计算后的输出结果。

小结
由于是简单的Spark Streaming的应用,只是为了熟悉Spark 流处理计算框架,为以后的技术方案做技术积累。
网友评论