前言
其实 Spark Streaming 主要就是把算子用用,多敲代码的事儿。我当时觉得这个Spark Streaming好像要提的事情并不多呀,所以就直接跳过了。然后···
image那我还是回头来扯扯吧😂
一、Spark Streaming
1.1 运行流程
- 首先我们无论是提交Spark core也好,Spark SQL也好,Spark Streaming也罢,我们都会统称为一个 “Application”
- 而且我们要知道,Spark SQL 和 Spark Streaming 的底层都是依赖于Spark Core的,所以我们要使用它们之前,必须先初始化好一个Spark Core的程序入口,从代码中体现的话,就是 StreamingContext 必须依赖于 SparkContext,而SparkContext里面我们必然是会初始化一个Driver的服务的,所以它们的结构就是这个样子
- 以上这些都初始化好之后,我们的worker自然也已经分配好 Executor 了
- 之后Driver服务就会发送 Receiver 对象到 Executor 上,Receiver默认就只有一个,当然也可以通过代码设置为多个
- Receiver启动后它其实就可以视为一个Task任务(这个Task就开始不断接收数据,并封装成block,之后把block写到Executor的内存中)
- Receiver会把这些Block的信息告诉Driver
虽然图很简陋,但是能把信息准确地表达就好。
- Driver会根据一定的时间间隔,把这些 Block 组织成一个 RDD ,其实一个 block 就是一个partition (1个partition -> 1个task)
1.2 BlockInterval 和 BatchInterval
这时候多少数据才会形成一个Block呢?答案是每 200 毫秒形成一个Block
那多久时间会把这些Block合并成一个RDD呢?答案就是你的代码中的那句
<pre language="typescript" code_block="true">// 这里的2是2秒的意思
val ssc = new StreamingContext(conf,Seconds(2));
复制代码</pre>
Driver就会把这2s中的数据看成一个RDD
这就是我们的 BlockInterval 和 BatchInterval ,这两个比较重要的时间参数,一个由默认值200毫秒,一个由我们用户自行控制
我们也可以在官网那儿去查看这些配置的参数
image然后ctrl+f,搜一下blockInterval或者200都可以找到
image暂时需要知道的就这么多,我们来动手写下代码
1.3 入门的WordCount程序
在之前的分享中已经反复提到过了,如果是实时的程序,我们关心的就是它的数据的输入,数据的处理,和数据的输出
1.3.1 POM文件
<pre language="typescript" code_block="true"><properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.1</spark.version>
<hadoop.version>2.7.5</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
复制代码</pre>
1.3.2 数据的输入
我们先要创建Spark Streaming的程序入口
image<pre language="typescript" code_block="true">// 1.数据的输入
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("wordCount")
复制代码</pre>
这里我们来解释一下,第一行就是我们刚刚提到的,创建SparkCore的程序入口,这个没毛病了,setAppName就是设置个名字,爱是啥就是啥,也没啥可说的
1.3.3 补充 setMaster(local[2]) 的扩展
第二行,我们setMaster(local[2]),这里要注意,我们消费Kafka的数据,有 Receiver 和 Direct 两种方式,这两种方式是不一样的。
官网中提供的JAR包是两个,一个是基于0.8版本的整合,这个提供了recevier和direct两种版本的整合,一个是0.10版本整合,只提供了direct方式。
Receiver整合是在Spark的executor当中启动了一个recevier的线程,专门去拉取数据,拉取回来的数据receiver是不会帮忙处理的,所以receiver就是搬东西的,它基于Kafka的high level API进行消费,offset自动保存于zookeeper中,不用自己主动维护。
而此时,拉取数据的线程和处理数据的线程互相是不通信的,当我们处理数据的线程挂掉了之后,拉取数据的线程是感知不到的,它仍在不断拉取数据。这时候数据全部会堆积在executor的内存里面,就会出现问题
Direct方式不再单独启动线程去拉取数据,获取到的数据也不再保存在executor里面,获取到的数据直接进行处理,拉取和处理完全就是一拨人。
当然它也有问题:使用Kafka的low API进行消费,需要手动维护offset的值。老版本中我们还会保存在zookeeper中,新版本默认是存在了Kafka的默认的一个topic里面,当然你为了一些特殊的需求,存在MySQL和Hbase···那些也是有可能的。
此时我们setMaster(local[1])行不行呢?那如果是direct的方式,那是可以的,不过此时我们就是单线程拉取并处理,但是如果是基于 Receiver 的方式进行消费的话,那就完蛋了,local[1]的意思是只启动一个线程的,这时候你就会发现你的程序不报错,可是数据死活不出来的情况,所以大家一定要注意,如果是local[*],那就是电脑的cpu core有多少开多少
之后
<pre language="typescript" code_block="true">val ssc = new StreamingContext(conf, Seconds(2))
复制代码</pre>
获取到SparkStreaming的程序入口,设置2s形成一个RDD,即可
1.3.4 数据的获取及处理
这里其实就是简单的从socket那去获取数据而已
<pre language="typescript" code_block="true">// 2.数据的处理
val dataDStream = ssc.socketTextStream("localhost", 8888)
val result = dataDStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
复制代码</pre>
1.3.5 数据的输出
<pre language="typescript" code_block="true">// 3.数据的输出
result.print()
复制代码</pre>
如果我们进入到企业工作,也会发现基本上我们只需要编写处理数据的那部分代码,前置部分的获取配置和下游的输出基本不会太过于关注
此时我们可以运行代码,用在flink的第二篇提到的netcat去监听8888端口(不是8888,随便也行)即可
image但是此时我们这个统计并不是累计的,累计的要求需要用到一些高级算子
网友评论