Spark Streaming程序基本步骤
编写Spark Streaming程序的基本步骤是:
1.通过创建输入DStream来定义输入源
2.通过对DStream应用转换操作和输出操作来定义流计算。
3.用streamingContext.start()来开始接收数据和处理流程。
4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
5.可以通过streamingContext.stop()来手动结束流计算进程。
创建StreamingContext对象
请登录Linux系统,启动pyspark。进入pyspark以后,就已经获得了一个默认的SparkConext,也就是sc。因此,可以采用如下方式来创建StreamingContext对象:
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 1)
1表示每隔1秒钟就自动执行一次流计算,这个秒数可以自由设定。
如果是编写一个独立的Spark Streaming程序,而不是在pyspark中运行,则需要通过如下方式创建StreamingContext对象:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
setAppName(“TestDStream”)
- 设置应用程序名称;
setMaster(“local[2]”)
- ”local[2]’表示本地模式,启动2个工作线程。
文件流(DStream) - 命令行中监听
Spark支持从兼容HDFS API的文件系统中读取数据,创建数据流。
为了能够演示文件流的创建,我们需要首先创建一个日志目录(/usr/local/spark/python_code/streaming),并在里面放置两个模拟的日志文件。log1.txt输入:
I love Hadoop
I love Spark
Spark is fast
请另外打开一个终端窗口,启动进入pyspark。
>>> from operator import add
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc,20)
>>> lines = ssc.textFileStream('file:///usr/local/spark/python_code/streaming/logfile')
>>> words = lines.flatMap(lambda line:line.split(' '))
>>> wordCounts = words.map(lambda word:(word,1)).reduceByKey(add)
>>> wordCounts.pprint()
>>> ssc.start()
>>> ssc.awaitTermination()-------------------------------------------
Time: 2018-08-13 20:40:40
-------------------------------------------
>>> ssc.awaitTermination()-------------------------------------------
Time: 2018-08-13 20:41:00
-------------------------------------------
-------------------------------------------
Time: 2018-08-13 20:41:20
-------------------------------------------
-------------------------------------------
Time: 2018-08-13 20:41:40
-------------------------------------------
输入ssc.start()以后,程序就开始自动进入循环监听状态,屏幕上会显示一堆的信息,下面的ssc.awaitTermination()是无法输入到屏幕上的。
Spark Streaming每隔20秒就监听一次。但是,监听程序只监听”/usr/local/spark/mycode/streaming/logfile”目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。所以,为了让我们能够看到效果,需要到”/usr/local/spark/mycode/streaming/logfile”目录下再新建一个log3.txt文件。请打开另外一个终端窗口再新建一个log3.txt文件,里面随便输入一些英文单词,保存后再切换回到spark-shell窗口。
现在你会发现屏幕上不断输出新的信息,导致你无法看清。所以必须停止这个监听程序,按键盘Ctrl+D,或者Ctrl+C。
你可以看到屏幕上,在一大堆输出信息中,你可以找到打印出来的单词统计信息。
文件流(DStream) - 编写独立应用程序进行监听
打开一个Linux终端窗口,进入shell命令提示符状态,然后,执行下面命令:
cd /usr/local/spark/mycode/streaming
vim TestStreaming.py
在TestStreaming.py中输入以下代码:
from operator import add
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[1]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 20)
lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x : (x,1)).reduceByKey(add)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
保存成功后,执行python TestStreaming.py
套接字流(DStream)
Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。
在/usr/local/spark/python_code/streaming
目录下新建NetworkWordCount.py
,输入如下内容:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
保存后,执行:
sudo nc -lk port #永久监听TCP端口
第二个终端作为监听窗口,执行如下代码:
cd /usr/local/spark/mycode/streaming
python3 NetworkWordCount.py localhost 9999
这样,就可以在nc第一个终端窗口窗口中随意输入一些单词,在监听窗口每隔1秒就会打印出词频统计信息。
RDD队列流(DStream)
我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream。
打开一个终端,进入Shell命令提示符状态,然后执行下面命令新建代码文件:
在/usr/local/spark/mycode/streaming
目录下创建TestRDDQueueStream.py
输入以下代码:
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingQueueStream")
ssc = StreamingContext(sc, 1)
# Create the queue through which RDDs can be pushed to
# a QueueInputDStream
rddQueue = []
for i in range(5):
rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
# Create the QueueInputDStream and use it do some processing
inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
reducedStream.pprint()
ssc.start()
time.sleep(6)
ssc.stop(stopSparkContext=True, stopGraceFully=True)
保存并回到命令行,输入python3 TestRDDQueueStream.py
,得到结果如下:
网友评论