目录
- SparkStreaming实时架构概述
- Spark架构以及概述
- 实战
- 缓存,持久化
- 检查点
- cache函数
- 累加器, 广播变量
- 可靠性
- 性能调优
- 上线spark streaming实时作业
- 上线spark离线作业
SparkStreaming实时架构概述
特点
-
Spark Streaming是Spark核心API的扩展,支持实时流数据的处理。这个扩展将Spark的强大计算能力和实时数据流的处理能力结合在一起,用户可以使用高层次的功能,如map、reduce、join、window等,同时还可以直接使用Spark的各种特性,如Spark SQL、MLlib(机器学习库)、GraphX(图计算库)等
概述1.png
-
Spark Streaming的工作原理是将实时数据流拆分为小的批处理作业,然后通过Spark Engine进行处理。这种设计使得它可以同时提供实时流处理的功能和高速批处理的性能
概述2.png
-
Spark Streaming支持多种数据源,包括Kafka、Flume、Kinesis等,也可以通过自定义数据源来支持其它系统。并且,Spark Streaming可以很容易地和Hadoop生态系统进行集成,包括HDFS、HBase、Storm等
-
在容错方面,Spark Streaming也有很好的支持,比如通过checkpoint机制保证数据的一致性和可恢复性,通过write-ahead log保证数据的端到端的容错性
Spark Streaming架构
- 整体架构
- spark streaming有个Driver驱动程序用来出来接收到数据的spark作业,然后将作业分配给响应的工作节点对应的执行器,执行器有可能处理输入数据流,也有可能处理各种算子的任务
-
接受器接受输入流数据时会将数据加载到内存,并且会备份到另一个节点做灾备,处理完之后接收器会报告给驱动器,然后驱动器继续发布任务到工作节点
整体架构.png
-
局部架构,在Spark Streaming中,每个DStream(Spark Streaming使用 离散化流作为抽象表示)操作都会生成一个或多个RDD,这些RDD会被存储在block中。block元数据是关于block的信息,例如block的创建时间、大小、位置等。这边能看到Executor工作节点的接受器会报告block ids给Driver
局部架构.png
- 局部架构2
-
Driver会将block元数据写入log中,可以使用Spark Streaming的日志记录功能。可以在应用程序的配置中启用日志记录,并将日志级别设置为INFO或DEBUG。这样,Spark Streaming会将有关block的信息记录到日志中,这种就是WAL write ahead log功能,包括工作节点Executor的接收器在写入内存时也会写入log做到一定灾备功能,可以根据log中的信息来恢复block,从而保证数据的完整性和一致性。此外,将block元数据写入log还可以帮助进行系统监控和调试。通过查看log中的信息,可以了解系统的运行情况和性能表现,及时发现和解决问题
2.1 元数据Checkpoint,元数据主要包括配置设置,DStream操作,未完成的批处理等等,这些元数据会定期地保存到checkpoint目录中。在driver程序失败之后,可以使用这些元数据恢复出一个新的driver程序
2.2 数据Checkpoint,某些DStream的数据需要在跨batch之间保持状态,为了保证这些状态在失败后可以恢复,我们需要定期的把这些数据写入checkpoint目录。如果driver程序失败,那么这些数据可以用于恢复这些状态
局部架构2.png
- 压备机制: 如果要限制Receiver 的数据接收速率,可以通过设置参数的值来实现,这样可以做到通过限制接收速率,来适配当前的处理能力,防止内存溢出, 1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制即Spark Streaming Backpressure: 根据JobScheduler 反馈作业的执行信息来动态调整Receiver 数据接收率。通过属性
spark.streaming.backpressure.enabled
来控制是否启用backpressure 机制,默认值false,即不启用
- Apache Spark调度器的主要任务是将Spark应用程序的任务分配到集群的节点上。Spark中有两种类型的调度器,FIFO调度器和公平调度器。
- FIFO调度器:这是Spark的默认调度器。在此模式下,Spark按照作业提交的顺序执行作业。也就是说,先提交的作业将获得所有可用的资源,以便尽快完成。只有在第一个作业完成或释放一些资源时,后面提交的作业才会开始执行
- 公平调度器:与FIFO调度器相比,公平调度器更关注资源的公平分配。它尝试确保所有运行的作业都获得相等的资源份额。如果有新的作业提交,公平调度器会重新分配资源,以便所有作业都能获得公平的资源份额。这种模式可以避免"饥饿"现象,即一个大作业占用所有资源,而其他小作业无法执行。
Spark架构以及概述
-
具体架构跟Spark Streaming类似,但是总体的包含以下部分
架构.png
- Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的
- Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作
- Spark Streaming:对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据
- MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作
- GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作
- 概述,Spark离线的应用场景
- 数据仓库:Spark可以处理数据仓库中的大规模数据集,进行离线计算和分析。通过与Hive等数据仓库工具集成,Spark可以高效地读取、处理和分析存储在数据仓库中的数据
- 批处理:Spark可以用于批处理大规模数据集,对数据进行清洗、整合、转换等操作。这种离线处理方式适用于对实时性要求不高,但对于数据处理性能和扩展性要求较高的场景。比如非实时用户画像标签等
- 数据挖掘和机器学习:Spark的MLlib库提供了丰富的机器学习算法,包括分类、聚类、回归、协同过滤等。对于需要离线进行数据挖掘和机器学习的场景,Spark可以提供高效、可扩展的计算能力。
ETL(提取、转换、加载)流程:Spark可以用于ETL流程中的数据转换和加载,特别是对于需要处理大规模数据的ETL流程。通过Spark的数据处理能力,可以提高ETL流程的效率 - 数据湖:数据湖是一个存储大规模数据的平台,以低成本的方式存储和管理大数据。Spark可以作为数据湖的一个组件,用于处理和分析存储在数据湖中的数据
- 总的来说,Spark离线的应用场景主要涉及大规模数据的批处理、分析、挖掘和加载等操作,适用于对实时性要求不高的场景
spark架构网络通信
- spark集群中是如何做集群发现的,在Spark中,集群发现的方式取决于配置。如果配置了Zookeeper,那么就会采用Zookeeper的注册方式进行集群发现;如果没有配置Zookeeper,那么就会采用广播的方式进行集群发现:
- 广播的方式:Master通过将Worker的host信息广播给Executor,让Executor去发现Worker。这种方式需要在Master上存储了Executor的信息,所以一旦Master挂掉了,会导致Executor和Worker之间通信的中断,配置集群的时候Master就知道了所有executor
- 注册的方式:Worker将自己注册到Master上,当Master启动的时候,就会监听Zookeeper,一旦Worker注册到Zookeeper上,Master就可以知道Worker的信息了。这种方式需要在Zookeeper上存储了Worker的信息,所以一旦Zookeeper挂掉了,会导致Master和Worker之间通信的中断
spark master高可用
- Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障的问题。而ZooKeeper是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务
- 因此,Spark为了解决Master单点故障的问题,采用了基于Zookeeper的Standby Master:将Spark集群连接到同一个Zookeeper实例,并启动多个Master,利用Zookeeper提供的选举和状态保存的功能,可以使一个Master被选举成活着的Master,其他Master处于Standby状态。如果现任的Master挂掉了,另外一个就可通过选举产生,并恢复到旧的Master状态,然后恢复调度
spark集群部署方式
- Standalone:独立模式,Spark原生的简单集群管理器,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统,使用Standalone可以很方便地搭建一个集群,一般在公司内部没有搭建其他资源管理框架的时候才会使用,spark.driver.supervise 这个设置的作用是用于决定当Spark Driver进程失败时,是否需要重新启动。这个配置通常在Spark的独立集群模式下使用。当设置
时,如果Spark Driver进程失败,Spark Master将会尝试重新启动Driver。这对于那些需要长期运行,且能够从上次运行的状态恢复过来的应用程序来说,是非常有用的
- Mesos:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括YARN
- YARN:统一的资源管理机制,在上面可以运行多套计算框架,如MapReduce、Storm等。根据driver在集群中的位置不同,分为YARN client和YARN cluster
Spark Sql
- 代码例子,在这个例子中,我们首先创建了一个SparkSession对象,用于与Spark集群进行交互。然后,我们使用read()方法读取CSV文件并将其注册为DataFrame。接下来,我们使用sql()方法执行SQL查询,并将结果存储在result变量中。最后,我们显示查询结果并关闭SparkSession对象。请注意,你需要将path/to/csv/file.csv替换为实际的CSV文件路径,并根据需要修改SQL查询
public class SparkSQLExample {
public static void main(String[] args) {
// 创建SparkSession对象
SparkSession spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local[*]") // 在本地运行,使用所有可用核心
.getOrCreate();
// 读取CSV文件并将其注册为DataFrame
Dataset<Row> df = spark.read().format("csv")
.option("header", "true") // 假设CSV文件包含标题行
.load("path/to/csv/file.csv");
// 显示DataFrame的内容
df.show();
// 执行SQL查询
Dataset<Row> result = spark.sql("SELECT * FROM df WHERE column_name = 'value'");
// 显示查询结果
result.show();
// 关闭SparkSession
spark.stop();
}
}
- 线上这种Saprk Sql或者Flink Sql一般都是界面拖拽形式处理,比如DolphinScheduler配置Spark Sql处理或者二开,这边不做细致讨论
实战
初始化StreamingContext 对象创建
- 下面代码中的 appName 是你给该应用起的名字,这个名字会展示在 Spark 集群的 web UI上。而 master 是 Spark, Mesos or YARN cluster URL,如果支持本地测试,你也可以用”local[*]”为其赋值。通常在实际工作中,你不应该将master参数硬编码到代码里,而是应用通过spark-submit的参数来传递master的值
- 下面代码将批次间隔设置成Durations.seconds(5)即5秒,批次间隔设置需要根据业务需求,数据量,集群资源,延迟等参数设置,批次间隔越小实时性越高,数据量如果比较大批次间隔应该小一点
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingDemo");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
StreamingContext 对象创建后基本流程
- 创建 DStream 对象,并定义好输入数据源
- 基于数据源 DStream 定义好计算逻辑和输出
- 调用 streamingContext.start() 启动接收并处理数据
- 调用 streamingContext.awaitTermination() 等待流式处理结束(不管是手动结束,还是发生异常错误)
- 你可以主动调用 streamingContext.stop() 来手动停止处理流程
注意
- 一旦 streamingContext 启动,就不能再对其计算逻辑进行添加或修改
- 一旦 streamingContext 被 stop 掉,就不能 restart。
- 单个 JVM 虚机 同一时间只能包含一个 active 的 StreamingContext
- StreamingContext.stop() 也会把关联的 SparkContext 对象 stop 掉,如果不想把 SparkContext 对象也 stop 掉,可以将StreamingContext.stop 的可选参数 stopSparkContext 设为false。
- 一个 SparkContext 对象可以和多个 StreamingContext 对象关联,只要先对前一个StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可
离散数据流DStream
-
它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream 内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集。每个 RDD 都包含了特定时间间隔内的一批数据
DStream.png
-
任何作用于 DStream 的算子,其实都会被转化为对其内部 RDD 的操作,比如flatMap 算子(类似java8的flatMap),会施加于 lines 中的每个 RDD 上,并生成新的对应的 RDD,而这些新生成的 RDD 对象就组成了 words 这个 DStream 对象
flatMap 算子.png
输入DStream和接收器
- Spark Streaming 主要提供两种内建的流式数据源:
- 基础数据源: 在 StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor,接上面举例过的代码,这边是基础数据源的一种
JavaReceiverInputDStream<String> lines = jssc.receiverStream(new CustomReceiver());
public static class CustomReceiver extends Receiver<String> {
private boolean running = true;
public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}
@Override
public void onStart() {
new Thread(this::generateData).start();
}
@Override
public void onStop() {
running = false;
}
private void generateData() {
Random random = new Random();
List<Integer> list = new ArrayList<>();
list.add(127);
list.add(128);
while (running) {
// 生成随机字符串
Date date = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
String strDate = formatter.format(date);
int randomIndex = random.nextInt(list.size());
String data = list.get(randomIndex) + " " + "test" + " " + "seeger " + strDate;
store(data);
try {
// 控制生成速度
Thread.sleep(5200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 高级数据源:需要依赖额外工具类的源,如Kafka、Flume、Rocketmq、Twitter等数据源。这些数据源都需要增加额外的依赖,下面给个Kafka的例子
// 设置Spark配置
SparkConf sparkConf = new SparkConf()
.setAppName("KafkaWordCount")
.setMaster("local[*]") // 在本地运行
.set("spark.executor.memory", "2g") // 设置每个executor的内存大小
.set("spark.driver.memory", "2g"); // 设置driver的内存大小
// 创建JavaStreamingContext对象
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(1000));
// 定义Kafka参数
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092"); // Kafka服务器地址
kafkaParams.put("key.deserializer", StringDeserializer.class); // Key的序列化类
kafkaParams.put("value.deserializer", StringDeserializer.class); // Value的序列化类
kafkaParams.put("group.id", "test"); // Consumer Group ID
kafkaParams.put("auto.offset.reset", "latest"); // 从最新的offset开始读取
kafkaParams.put("enable.auto.commit", false); // 不自动提交offset
// 定义Kafka主题列表
List<String> topics = Arrays.asList("topic1", "topic2");
// 从Kafka读取数据
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
DStream支持的transformation算子
Transformation算子 | 用途 | |
---|---|---|
map(func) | 返回会一个新的DStream,并将源DStream中每个元素通过func映射为新的元素 | |
flatMap(func) | 和map类似,不过每个输入元素不再是映射为一个输出,而是映射为0到多个输出 | |
filter(func) | 返回一个新的DStream,并包含源DStream中被func选中(func返回true)的元素 | |
repartition(numPartitions) | 更改DStream的并行度(增加或减少分区数) | |
union(otherStream) | 返回新的DStream,包含源DStream和otherDStream元素的并集 | |
count() | 返回一个包含单元素RDDs的DStream,其中每个元素是源DStream中各个RDD中的元素个数 | |
reduce(func) | 返回一个包含单元素RDDs的DStream,其中每个元素是通过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合得到的结果。func必须满足结合律,以便支持并行计算。 | |
countByValue() | 如果源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。 | |
reduceByKey(func, [numTasks]) | 如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。注意:默认情况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由spark.default.parallelism 决定)。你可以通过可选参数numTasks来指定并发任务个数 | 。 |
join(otherStream, [numTasks]) | 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每个K都对应一个 (K, (V, W))键值对元素。 | |
cogroup(otherStream, [numTasks]) | 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每个元素类型为包含(K, Seq[V], Seq[W])的tuple。 | |
transform(func) | 返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作。 | |
updateStateByKey(func) | 返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。 |
updateStateByKey算子
- 在每一个批次数据到达后,Spark都会调用状态更新函数,来更新所有已有key(不管key是否存在于本批次中)的状态。如果状态更新函数返回None,则对应的键值对会被删,调用 updateStateByKey 前需要配置检查点checkpointing目录,后续对此有详细的讨论
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = ...
return Optional.of(newSum);
};
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
- 可以使用updateStateByKey函数来处理有状态的转换操作,这包括连续的滑动窗口数据分析,比如连续两个窗口都达到条件则告警,比如你可能需要跟踪每个用户的购买历史,以便根据历史购买行为进行推荐
- 当然也可以使用redis代替updateStateByKey函数做状态追踪,优缺点:
- 使用Redis作为缓存的优点包括:
1.1 内存存储:Redis使用内存存储数据,因此读写速度非常快,适用于大规模数据处理
1.2 数据结构丰富:Redis提供了多种数据结构,如键值对、列表、哈希等,可以方便地存储和更新状态。
分布式部署:Redis支持分布式部署,可以扩展处理能力,适用于大规模数据处理场景
1.3 然而,使用Redis作为缓存也有一些挑战和注意事项:数据一致性,集成和同步:需要编写相应的代码来处理状态更新和查询操作,并确保与Spark Streaming的集成和同步 - 使用updateStateByKey函数的优点包括:
2.1 状态管理:updateStateByKey函数提供了状态管理的功能,可以根据键更新状态,适用于有状态的转换操作
2.2 容错性:Spark Streaming提供了容错机制,可以保证在系统故障时数据的正确恢复
2.3 集成方便:updateStateByKey函数是与Spark Streaming紧密集成的,使用起来比较方便
2.4 然而,使用updateStateByKey函数也有一些挑战和注意事项:状态大小限制:updateStateByKey函数的状态是保存在每个executor上的,如果状态过大,可能会占用大量的内存。状态持久化:对于有状态的转换操作,需要将状态持久化存储,以便在系统故障时能够正确恢复。状态一致性
transform 算子
- transform算子可以支持任意的RDD到RDD的映射操作
JavaPairDStream<String, String> pairDStream = windowLines.transformToPair(
(Function<JavaRDD<String>, JavaPairRDD<String, String>>) rdd -> {
// 进行转换, 模拟访问外部数据库
System.out.println("模拟外部访问");
return rdd.mapPartitionsToPair(s -> {
// 将每个单词转换为(key, value)格式,value为1
List<Tuple2<String, String>> tupleList = new ArrayList<>();
while (s.hasNext()) {
String ele = s.next();
String[] words = ele.split(" ");
tupleList.add(new Tuple2<>(words[0], ele));
}
return tupleList.iterator();
}
);
}
);
Join 算子
- 一个数据流可以和另一个数据流直接关, stream1的每个批次中的RDD会和stream2相应批次中的RDD进行join。同样,你可以类似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
- 基于窗口join
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
DStream输出算子
- 输出算子可以将 DStream 的数据推送到外部系统,如:数据库或者文件系统。因为输出算子会将最终完成转换的数据输出到外部系统,因此只有输出算子调用时,才会真正触发 DStream transformation 算子的真正执行
输出算子 | 用途 |
---|---|
print() | 在驱动器(driver)节点上打印DStream每个批次中的头十个元素。Python API 对应的Python API为 pprint() |
saveAsTextFiles(prefix, [suffix]) | 将DStream的内容保存到文本文件。每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]” |
saveAsObjectFiles(prefix, [suffix]) | 将DStream内容以序列化Java对象的形式保存到顺序文件中。每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python |
saveAsHadoopFiles(prefix, [suffix]) | 将DStream内容保存到Hadoop文件中。每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python |
foreachRDD(func) | 这是最通用的输出算子了,该算子接收一个函数func,func将作用于DStream的每个RDD上。func应该实现将每个RDD的数据推到外部系统中,比如:保存到文件或者写到数据库中。注意,func函数是在streaming应用的驱动器进程中执行的,所以如果其中包含RDD的action算子,就会触发对DStream中RDDs的实际计算过程。 |
- 般来说,连接对象是有时间和资源开销限制的。因此,对每条记录都进行一次连接对象的创建和销毁会增加很多不必要的开销,同时也大大减小了系统的吞吐量。一个比较好的解决方案是使用 rdd.foreachPartition – 为RDD的每个分区创建一个单独的连接对象
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
测试使用的完整代码
- 这里是测试用的代码并不会太重注规范,只是为了说明API以及对应使用
- maven,版本用的比较旧是跟当前公司线上业务保持一致,以方便做总结
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.6</version>
</dependency>
- 涉及的Bean
public class TestDTO implements Serializable {
private Integer cnt;
private List<String> dates;
private String firstKey;
public TestDTO(Integer cnt, List<String> dates, String firstKey) {
this.cnt = cnt;
this.dates = dates;
this.firstKey = firstKey;
}
public Integer getCnt() {
return cnt;
}
public void setCnt(Integer cnt) {
this.cnt = cnt;
}
public List<String> getDates() {
return dates;
}
public void setDates(List<String> dates) {
this.dates = dates;
}
public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this.firstKey = firstKey;
}
}
- 测试用的代码, 相对比较完整,具体可看注释
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
import scala.Tuple2;
import java.text.SimpleDateFormat;
import java.util.*;
public class SparkStreamingDemo {
public static void main(String[] args) throws InterruptedException {
// 创建SparkConf对象
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingDemo");
// 创建JavaStreamingContext对象,设置批处理间隔为5秒
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
// 创建一个随机生成数据的DStream
JavaReceiverInputDStream<String> lines = jssc.receiverStream(new CustomReceiver());
// 窗口函数比较常用, 这个窗口表示每隔一分钟执行一次, 处理两分钟窗口(当前执行时间往前加)的数据,eg: 11:04:50执行一次foreach输出
// 11:05:50执行第二次foreach输出, 里面计算的数据都是前两分钟内的,比如11:05:50 数据时间是从11:03:54-11:05:49
JavaDStream<String> windowLines = lines.window(Durations.minutes(2), Durations.minutes(1));
// 转换为PairDStream transformToPair的使用demo, 弄成String, Integer <key, 1> transformToPair + mapPartitionsToPair
// 适用于涉及大量初始化工作或与外部系统交互的函数;而mapToPair操作的是RDD的单个元素,适用于需要将数据转化成键值对进行后续操作的场景
// mapPartitionsToPair mapPartitions没ToPair返回对象而不是map
JavaPairDStream<String, String> pairDStream = windowLines.transformToPair(
(Function<JavaRDD<String>, JavaPairRDD<String, String>>) rdd -> {
// 这个map正常是个单例类并且可以考虑定时刷新数据库数据,这边举例,broadcastVar会被广播,不用再从db读取
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(rdd.context());
Map<String, String> x = new HashMap<>();
x.put("sf", "SDfsdf");
Broadcast<Map<String, String>> broadcastVar = jsc.broadcast(x);
// 进行转换, 模拟访问外部数据库, 这块在driver执行
System.out.println("模拟外部访问");
return rdd.mapPartitionsToPair(s -> {
// 将每个单词转换为(key, value)格式,value为1,, 这块在work执行
List<Tuple2<String, String>> tupleList = new ArrayList<>();
while (s.hasNext()) {
String ele = s.next();
String[] words = ele.split(" ");
tupleList.add(new Tuple2<>(words[0], ele));
}
return tupleList.iterator();
}
);
}
);
// 与pairDStream的key相同, 多线程处理, 弄成String, Integer <key, <1,1,1...>>
// groupByKey: 这个操作会将具有相同键的所有值聚集到一个迭代器中。它将整个数据集分区,然后进行网络传输,这可能会导致大量的网络 I/O,从而导致性能瓶颈。因此,如果你只是想对每个键的元素进行归约或聚合操作,那么通常更推荐使用 reduceByKey 或 combineByKey。
// combineByKey: 这个操作会在每个分区上使用一个给定的 combine 函数将值合并到一起,然后再将结果合并到一起,这样可以减少网络传输的数据量,从而提高效率。combineByKey 允许使用者返回与输入的值类型不同的类型,因此它比 groupByKey 和 reduceByKey 更为通用。
JavaPairDStream<String, List<String>> combineDStream = pairDStream.combineByKey(
(Function<String, List<String>>) value -> {
List<String> list = new ArrayList<>();
list.add(value);
return list;
},
(Function2<List<String>, String, List<String>>) (list, value) -> {
list.add(value);
return list;
},
new Function2<List<String>, List<String>, List<String>>() {
@Override
public List<String> call(List<String> list1, List<String> list2) {
list1.addAll(list2);
return list1;
}
},
new HashPartitioner(4)
);
combineDStream.foreachRDD(new CombinedFunction2Test());
// 统计结果
JavaDStream<TestDTO> resultDStream = combineDStream.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, List<String>>>, TestDTO>() {
@Override
public Iterator<TestDTO> call(Iterator<Tuple2<String, List<String>>> tuple2Iterator) throws Exception {
List<TestDTO> list = new ArrayList<>();
while (tuple2Iterator.hasNext()) {
Tuple2<String, List<String>> tuple = tuple2Iterator.next();
List<String> dates = new ArrayList<>();
List<String> res = tuple._2();
Set<String> date = new HashSet<>();
for (String single : res) {
String[] parse = single.split(" ");
String dateSingle = parse[3];
date.add(dateSingle);
dates.add(dateSingle);
}
list.add(new TestDTO(date.size(), dates, tuple._1()));
}
return list.iterator();
}
});
// 数据聚合全部写入,中间数据作为分析用, 一个分区就好不并发写入
resultDStream.repartition(1).foreachRDD(new CombinedFunction33Test());
// 最后再以时间维度聚合一个个map, 以cnt聚合类似select (select from group by A,B) from group by A, 简化处理这边
resultDStream.filter(
(Function<TestDTO, Boolean>) dto -> dto.getFirstKey().equals("127")
).mapToPair(
(PairFunction<TestDTO, String, TestDTO>) dto -> new Tuple2(dto.getCnt().toString(), dto)
).groupByKey().repartition(1).foreachRDD(new CombinedFunction5Test());
// 启动Streaming应用程序
jssc.start();
// 等待程序运行完成
jssc.awaitTermination();
}
// 自定义Receiver生成随机数据源
public static class CustomReceiver extends Receiver<String> {
private boolean running = true;
public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}
@Override
public void onStart() {
new Thread(this::generateData).start();
}
@Override
public void onStop() {
running = false;
}
private void generateData() {
Random random = new Random();
List<Integer> list = new ArrayList<>();
list.add(127);
list.add(128);
while (running) {
// 生成随机字符串
Date date = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
String strDate = formatter.format(date);
int randomIndex = random.nextInt(list.size());
String data = list.get(randomIndex) + " " + "test" + " " + "seeger " + strDate;
store(data);
try {
// 控制生成速度
Thread.sleep(5200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static class CombinedFunction33Test implements VoidFunction2<JavaRDD<TestDTO>, Time> {
@Override
public void call(JavaRDD<TestDTO> stringListJavaPairRDD, Time time) throws Exception {
String uuid = UUID.randomUUID().toString();
System.out.println("当前时间parse__ " + new Date(System.currentTimeMillis()) + "____" + uuid);
// 获取JavaRDD的分区数
int numPartitions = stringListJavaPairRDD.partitions().size();
System.out.println("Number of partitions: " + numPartitions + "____" + uuid);
stringListJavaPairRDD.foreachPartition(ele -> {
while (ele.hasNext()) {
TestDTO res = ele.next();
System.out.println("当前测试的数据parse" + "___Res__" + res.getFirstKey() + "__" + res.getCnt() + "___" + res.getDates().toString() + "___" + uuid);
}
});
System.out.println("结束时间parse___" + "____" + uuid);
}
}
public static class CombinedFunction2Test implements VoidFunction2<JavaPairRDD<String, List<String>>, Time> {
@Override
public void call(JavaPairRDD<String, List<String>> stringListJavaPairRDD, Time time) throws Exception {
String uuid = UUID.randomUUID().toString();
System.out.println("当前时间__ " + new Date(System.currentTimeMillis()) + "____" + uuid);
// 获取JavaRDD的分区数
int numPartitions = stringListJavaPairRDD.partitions().size();
System.out.println("Number of partitions: " + numPartitions + "____" + uuid);
stringListJavaPairRDD.foreachPartition(ele -> {
while (ele.hasNext()) {
Tuple2<String, List<String>> res = ele.next();
System.out.println("当前测试的数据" + "___Res__" + res._1() + "___" + res._2().toString() + "___" + uuid);
}
});
System.out.println("结束时间___" + "____" + uuid);
}
}
public static class CustomKeyPartitioner extends Partitioner {
private final int numPartitions;
public CustomKeyPartitioner(int numPartitions) {
this.numPartitions = numPartitions;
}
@Override
public int numPartitions() {
return numPartitions;
}
@Override
public int getPartition(Object key) {
if (key instanceof Tuple2) {
return ((Tuple2) key)._1().hashCode() % numPartitions;
}
return key.hashCode() % numPartitions;
}
}
public static class CombinedFunction5Test implements VoidFunction2<JavaPairRDD<String, Iterable<TestDTO>>, Time> {
@Override
public void call(JavaPairRDD<String, Iterable<TestDTO>> stringListJavaPairRDD, Time time) throws Exception {
String uuid = UUID.randomUUID().toString();
System.out.println("当前时间parse final res__ " + new Date(System.currentTimeMillis()) + "____" + uuid);
// 获取JavaRDD的分区数
int numPartitions = stringListJavaPairRDD.partitions().size();
System.out.println("Number of partitions: " + numPartitions + "____" + uuid);
stringListJavaPairRDD.foreachPartition(ele -> {
while (ele.hasNext()) {
Tuple2<String, Iterable<TestDTO>> res = ele.next();
Iterable<TestDTO> resFor = res._2();
resFor.forEach(single ->
System.out.println("当前测试的数据parse final res" + "___Res key__" + res._1() + "____" + single.getFirstKey() + "__" + single.getCnt().toString() + "___" + single.getDates().toString() + "___" + uuid));
}
});
System.out.println("结束时间parse final res___" + "____" + uuid);
}
}
}
缓存,持久化
- 和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)
- 对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错
- 注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中
检查点
- 一般来说Streaming 应用都需要7*24小时长期运行,所以必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便能够随时从故障中恢复回来。所以,检查点需要保存以下两种数据:
- 元数据检查点: 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)主要用途是用于在故障后回复应用程序本身。元数包括:
Configuration – 创建Streaming应用程序的配置信息。
DStream operations – 定义流式处理逻辑的DStream操作信息。
Incomplete batches – 已经排队但未处理完的批次信息 - 数据检查点: 将生成的RDD保存到可靠的存储中。这对一些需要跨批次组合数据或者有状态的算子来说很有必要。在这种转换算子中,往往新生成的RDD是依赖于前几个批次的RDD,因此随着时间的推移,有可能产生很长的依赖链条。为了避免在恢复数据的时候需要恢复整个依赖链条上所有的数据,检查点需要周期性地保存一些中间RDD状态信息,以斩断无限制增长的依赖链条和恢复时间
- 总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操作的恢复
- 如果有以下情况出现,你就必须启用检查点了:
- 用了有状态的转换算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow,你都必须配置检查点目录来周期性地保存RDD检查点。
- 支持驱动器故障中恢复: 这时候需要元数据检查点以便恢复流式处理的进度信息
- 创建检查点,检查点保存过于频繁又会导致血统信息和任务个数的增加,这同样会影响系统性能。对于需要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员可以这样来自定义这个间隔:dstream.checkpoint(checkpointInterval)。一般推荐设为批次间隔时间的5~10倍
// 创建StreamingContext
StreamingContext ssc = new StreamingContext(sparkConf, Seconds(10));
// 设置检查点目录
String checkpointDir = "hdfs://localhost:9000/checkpoint";
ssc.checkpoint(checkpointDir);
cache函数
- 在Spark Streaming中,cache()函数用于将数据持久化,以便在后续操作中重复使用。通过缓存数据,Spark可以避免在每次处理数据时重新计算或重新加载数据,从而提高处理效率
- 使用cache()函数可以使得RDD或DStream持久化在内存中,这样在进行后续操作时可以直接访问缓存中的数据,而不需要重新计算或重新加载
inputStream.cache()
累加器, 广播变量
- 代码示例
JavaPairDStream<String, String> pairDStream = windowLines.transformToPair(
(Function<JavaRDD<String>, JavaPairRDD<String, String>>) rdd -> {
// 这个map正常是个单例类并且可以考虑定时刷新数据库数据,这边举例,broadcastVar会被广播,不用再从db读取
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(rdd.context());
Map<String, String> x = new HashMap<>();
x.put("sf", "SDfsdf");
Broadcast<Map<String, String>> broadcastVar = jsc.broadcast(x);
// 业务代码
}
);
- 在Spark Streaming中,累加器(Accumulator)和广播变量(Broadcast Variable)是两种常用的优化技术,用于提高处理效率和减少网络通信开销
- 累加器(Accumulator):累加器是一种在Spark作业中用于累加计算结果的变量。它们被设计为只写(只能增加其值)的变量,并且可以在并行计算中安全地共享。累加器的主要用途是在Spark Streaming应用程序中跟踪和统计全局状态
// 创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(10))
// 创建累加器
val counter = ssc.sparkContext.longAccumulator("counter")
// 创建DStream并处理数据
val inputStream = ssc.socketTextStream("localhost", 9999)
inputStream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
// 在每个分区中进行累加计算
partition.foreach { record =>
counter.add(1) // 增加累加器的值
}
}
}
- 广播变量是一种在Spark中高效共享大量数据的方式。当您需要在Spark作业的多个节点之间共享大量只读数据时,使用广播变量可以避免在每个节点上都复制一份数据,从而显著减少网络通信开销和内存消耗
- 广播变量通过将数据发送到集群的每个节点一次,并在本地缓存这些数据,使得这些数据可以在集群的所有节点上高效地访问。这对于在并行计算中需要频繁访问的数据非常有用,类似这种driver读取数据,executor广播
spark.sparkContext().broadcast(jdbcParams);
spark.readStream()
.jdbc(jdbcParams, "(SELECT mytable.*, row_number() OVER (ORDER BY id) AS row_num FROM mytable) AS mytable", jdbcOptions);
可靠性
- RDD是不可变的,可重算的,分布式数据集。每个RDD都记录了其创建算子的血统信息,其中每个算子都以可容错的数据集作为输入数据
- 如果RDD的某个分区因为节点失效而丢失,则该分区可以根据RDD的血统信息以及相应的原始输入数据集重新计算出来。
假定所有RDD transformation算子计算过程都是确定性的,那么通过- 这些算子得到的最终RDD总是包含相同的数据,而与Spark集群的是否故障无关 - 接收并保存了副本的数据 – 数据不会因为单个worker节点故障而丢失,因为有副本
- 接收但尚未保存副本数据 – 因为数据并没有副本,所以一旦故障,只能从数据源重新获取
- Worker节点故障 – 任何运行执行器的worker节点一旦故障,节点上内存中的数据都会丢失。如果这些节点上有接收器在运行,那么其包含的缓存数据也会丢失
- Driver节点故障 – 如果Spark Streaming的驱动节点故障,那么很显然SparkContext对象就没了,所有执行器及其内存数据也会丢失
exaclty once语意
- 数据接收: 不同数据源提供的保证不同
- 如果所有的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理所有的数据。这种情况下就能保证精确一次语义,也就是说不管出现什么故障,所有的数据总是精确地只处理一次
- 可靠接收器比如kafka等 – 这类接收器会在数据接收并保存好副本后,向可靠数据源发送确认信息。这类接收器故障时,是不会给缓存的(已接收但尚未保存副本)数据发送确认信息。因此,一旦接收器重启,没有收到确认的数据,会重新从数据源再获取一遍,所以即使有故障也不会丢数据
- 不可靠接收器 – 这类接收器不会发送确认信息,因此一旦worker和driver出现故障,就有可能会丢失数据,为了避免丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write ahead logs enabled),那么久再也不用为数据丢失而担心了。并且这时候,还能提供“至少一次”的语义保证
- 数据转换: 所有的数据都会被 exaclty once处理,这要归功于RDD不可变提供的保障。即使出现故障,只要数据源还能访问,最终所转换得到的RDD总是包含相同的内容
- 数据推送: 输出操作默认保证exaclty once的语义,是否能exaclty once还要看所使用的输出算子(是否幂等)以及下游系统(是否支持事务)。不过用户也可以开发自己的事务机制来实现“精确一次”语义,比如用redis存操作序列是否被处理过,mysql乐观锁等保证幂等,举例用批次时间(在foreachRDD中可用)和分区索引创建一个唯一标识,该标识代表流式应用中唯一的一个数据块。
基于这个标识建立更新事务,并使用数据块数据更新外部系统。也就是说,如果该标识未被提交,则原子地将标识代表的数据更新到外部系统。否则,就认为该标识已经被提交,直接忽略之
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// 使用uniqueId作为事务的唯一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交
}
}
性能调优
数据接收并发度
- 每个输入DStream只包含一个单独的接收器(receiver,运行约worker节点),每个接收器单独接收一路数据流。所以,配置多个输入DStream就能从数据源的不同分区分别接收多个数据流。例如,可以将从Kafka拉取两个topic的数据流分成两个Kafka输入数据流,每个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数据,因而增加了总体的吞吐量。同时,另一方面我们又可以把这些DStream数据流合并成一个,然后可以在合并后的DStream上使用任何可用的transformation算子
- 另一个可以考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数(configuration parameter)spark.streaming.blockInterval决定。大多数接收器都会将数据合并成一个个数据块,然后再保存到spark内存中。对于map类算子来说,每个批次中数据块的个数将会决定处理这批数据并行任务的个数,每个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,如果数据块间隔为200ms,则创建的并发任务数为10。如果任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增加这个任务数,对于给定的批次间隔来说,只需要减少数据块间隔即可。不过,我们还是建议数据块间隔至少要50ms,否则任务的启动开销占比就太高了
数据处理并发度
- 在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能造成集群资源利用率低。例如,对于reduce类的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既可以修改这个默认值(spark.default.parallelism),也可以通过参数指定这个并发数量
任务启动开销
- 如果每秒启动的任务数过多(比如每秒50个以上),那么将任务发送给slave节点的开销会明显增加,那么你也就很难达到亚秒级(sub-second)的延迟。不过以下两个方法可以减少任务的启动开销:
- 任务序列化(Task Serialization): 使用Kryo来序列化任务,以减少任务本身的大小,从而提高发送任务的速度。任务的序列化格式是由 spark.closure.serializer 属性决定的。不过,目前还不支持闭包序列化,未来的版本可能会增加对此的支持。
- 执行模式(Execution mode): Spark独立部署或者Mesos粗粒度模式下任务的启动时间比Mesos细粒度模式下的任务启动时间要短。详见Running on Mesos guide。
- 这些调整有可能能够减少100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能
设置合适的批次间隔
- 要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间
- 根据 Spark Streaming 计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力,要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试
内存调优
- DStream持久化级别(Persistence Level of DStreams): 前面数据序列化(Data Serialization)这小节已经提到过,默认streaming的输入RDD会被持久化成序列化的字节流。相对于非序列化数据,这样可以减少内存占用和GC开销。如果启用Kryo序列化,还能进一步减少序列化数据大小和内存占用量。如果你还需要进一步减少内存占用的话,可以开启数据压缩(通过spark.rdd.compress这个配置设定),只不过数据压缩会增加CPU消耗
- 清除老数据(Clearing old data): 默认情况下,所有的输入数据以及DStream的transformation算子产生的持久化RDD都是自动清理的。Spark Streaming会根据所使用的transformation算子来清理老数据。例如,你用了一个窗口操作处理最近10分钟的数据,那么Spark Streaming会保留至少10分钟的数据,并且会主动把更早的数据都删掉。当然,你可以设置 streamingContext.remember 以保留更长时间段的数据(比如:你可能会需要交互式地查询更老的数据)。
- CMS垃圾回收器(CMS Garbage Collector): 为了尽量减少GC暂停的时间,我们强烈建议使用CMS垃圾回收器(concurrent mark-and-sweep GC)。虽然CMS GC会稍微降低系统的总体吞吐量,但我们仍建议使用它,因为CMS GC能使批次处理的时间保持在一个比较恒定的水平上
缓存
- 在使用Spark Streaming进行状态更新时,经常会使用repartition()方法来重新分区RDD,以便在更新状态之前将数据均匀地分布到集群中的不同节点上而调用cache()方法可以将repartition()方法生成的RDD缓存在内存中,以便后续的操作可以快速访问数据。通过使用repartition()和cache()结合使用,可以提高状态更新的效率。repartition()方法可以减少数据倾斜的问题,将数据平均分布到集群中的节点上。而cache()方法可以将数据缓存在内存中,避免重复计算,提高访问数据的速度。这两个方法的组合可以提高状态更新的并行度和速度
- 另外,调用repartition()方法会生成一个新的RDD,而调用cache()方法可以将RDD缓存在内存中,并返回一个新的RDD。因此,使用repartition().cache()的组合可以保证每次状态更新使用的是新的RDD,并且可以通过RDD的依赖关系保留缓存。这样可以在更新状态时减少计算量,提高效率。总之,通过在Spark Streaming中使用repartition().cache()的组合,可以优化状态更新的并行度和速度,提高任务的执行效率
- 数据倾斜:如果你的数据在某些键上倾斜,这意味着某一部分的工作量过重,那么你可以使用repartition来重新分配数据,使得每个分区的数据量均衡,进而提高处理效率。
- 动态调整并行度:如果你发现当前的并行度(分区数)过大或过小,导致资源利用率不佳,那么可以使用repartition来动态调整并行度
- 针对宽转换操作:如果你的操作是一个宽转换操作(如 join、groupByKey 等),这些操作在执行时需要大量的数据洗牌(shuffle),此时使用repartition可以减少洗牌的数据
- 提高缓存效率:如果你有一个持久化的数据集并且这个数据集会被多次使用,那么使用repartition可以将数据集分散到多个节点上,提高缓存效率
-提高数据本地性:如果你的数据分布在不同的节点上,那么使用repartition可以提高数据本地性,减少网络传输
上线spark streaming实时作业
- 我们需要将测试代码其中setMaster代码去掉
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingDemo");
- 改成
SparkConf conf = new SparkConf().setAppName("SparkStreamingDemo");
- 然后打包成jar包,可以用maven-assembly-plugin把依赖打进去,然后像服务器spark集群的master提交作业
线上一般有界面支持提交,类似DolphinScheduler
- 这边如果图省事可以直接把DolphinScheduler部署跟master一台机器,master故障可以手动一起切,这样子就可以手动直接界面spark submit而且没有网络问题,当然最正常的应该跟master不混部避免影响master, 不混部要解决不同机器网络通信问题
- 这边不过多介绍Spark集群安装和DolphinScheduler,这里有一台腾讯云服务器A,2C4G,已简单安装了Spark集群,然后用命令的形式模拟DolphinScheduler界面上传,具体步骤
- 上机器A开启spark集群,在spark安装目录sbin底下,刚启动master的界面如下
./start-all.sh

- 然后我们上机器A像master提交作业,因为机器性能有限,所以提交时做了很多限制,不然资源启动不起来,并且设置了jvm参数,打印gc和oom dump
nohup /usr/local/spark/bin/spark-submit
--master spark://localhost:7077
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+PrintGCDateStamps -XX:NumberOfGCLogFiles=5 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/spark -Xloggc:/var/log/spark/gc.log"
--driver-memory 500m
--executor-memory 500m
spark_demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar > output.log &
-
提交之后我们再次看下master的界面,多了运行中的应用,我们点进去
提交后.png
-
这边启动之后更详细的作业运行情况, 不同作业不同运行中的应用driver地址不同,这边资源有限不举例多个作业的情况, job表示每个foreach算子等,每个job有不同stage,每个stage里面可能并行执行,每个stage一般以groupKey, repartition等算子分,Storage就是RDD分区
更详细的作业运行情况.png
不同stage.png
-
可以看到master按照命令帮我启动了一个driver,driver启动了一个executor
executor.png
-
spark streming有更详细的每个批次统计监控信息
统计监控信息.png
-
gc日志也正常记录输出
gc日志.png
- rdd外面的日志在我们启动时nohup.out里面


- rdd日志在work底下, work目录跟界面上appId对应
/usr/local/spark/work/app-20231223101158-0000/0


上线spark离线作业
离线作业代码示例
- 本地运行记得加下setMaster local这种
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.SparkConf;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class WordCountOffLineDemo {
public static void main(String[] args) {
// 创建SparkConf对象,配置Spark属性
SparkConf sparkConf = new SparkConf().setAppName("WordCount");
// 创建JavaSparkContext对象,是spark的java编程入口
JavaSparkContext context = new JavaSparkContext(sparkConf);
// 创建一个包含随机单词的列表
List<String> data = Arrays.asList("apple banana", "orange apple", "banana orange", "apple orange", "orange banana", "banana apple");
// 通过parallelize方法,将内存中的集合创建成RDD
JavaRDD<String> lines = context.parallelize(data);
// 切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
// 将单词和1组合
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> new Tuple2<>(word, 1));
// 计算每个单词出现的次数
JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(Integer::sum);
// 打印结果
result.foreach(tuple -> System.out.println(tuple._1 + " : " + tuple._2));
// 关闭context
context.stop();
}
}
master界面
-
模拟DolphinScheduler提交之后显示有运行中的应用
master界面.png
-
模拟DolphinScheduler定时一定时间自动提交就有很多完成的任务
master界面1.png
详细界面
-
因为是离线作业所以没像Spark Streming有那边多监控
整体.png
image.png
网友评论