美文网首页Spark学习笔记
13.Spark学习(Python版本):DStream输出操作

13.Spark学习(Python版本):DStream输出操作

作者: 马淑 | 来源:发表于2018-09-01 15:00 被阅读279次
把DStream输出到文本文件中

为了不破坏以前的代码,我们单独复制上面这些代码到新的文件中,执行如下代码:
cp ./NetworkWordCountStateful.py ./DstreamToText.py
使用vim编辑器打开DstreamToText.py代码文件:
vim ./DstreamToText.py
尝试把DStream内容保存到文本文件中,可以使用如下语句:
running_counts.saveAsTextFiles(“file:///usr/local/spark/mycode/streaming/output.txt”)
将其加入到DstreamToText.py中:

from __future__ import print_function
 
import sys
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
 
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
 
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDD=initialStateRD)
    running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/output.txt")
    running_counts.pprint()
 
    ssc.start()
    ssc.awaitTermination()

运行此程序,执行如下命令:
python ./DstreamToText.py localhost 9999

打开另外一个终端,作为单词产生的源头,提供给NetworkWordCountStateful程序进行词频统计:

nc -lk 9999
//请手动输入一些单词,可以随便输入,比如下面是笔者输入的单词
hadoop
spark
hadoop
spark
hadoop
spark

这个时候,你再去看刚才运行NetworkWordCountStateful程序的监听窗口,就可以看到类似下面的词频统计结果:



请执行如下命令:

cd /usr/local/spark/mycode/streaming/
ls

可以发现,在这个目录下,生成了很多文本文件,如下:



我们可以查看一下某个output.txt下面的内容:

cat output.txt-1535780207000/*
('hadoop', 3)
('hello', 1)
('spark', 2)
('world', 1)

说明我们已经成功地把DStream输出到文本文件了。

把DStream写入到MySQL数据库中

执行下面命令在Linux中启动MySQL数据库,并完成数据库和表的创建:

service mysql start
mysql -u root -p

在我们之前“[通过JDBC连接数据库的内容中,我们已经在MySQL数据库中创建了一个名称为“spark”的数据库和名称为“student”的表。这里,我们可以直接使用这个已经创建好的“spark”数据库,然后,在这个数据库中创建一个名称为“wordcount”的表,命令如下:

mysql> create database sparkdb;
mysql> use sparkdb;
mysql> create table wordcount (word char(20), count int(4));

由于需要python连接到mysql的模块,所以请打开新的Terminal执行如下命令:
sudo pip3 install PyMySQL

对NetworkWordCountStateful.py代码文件进行修改,为了方便起见,建立副本DstreamToDB.py,再用vim编辑器新建该文件:

cd /usr/local/spark/python_code/streaming
cp NetworkWordCountStateful.py DstreamToDB.py
vim DstreamToDB.py

DstreamToDB.py代码修改如下:

from __future__ import print_function
 
import sys
 
import pymysql
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/python_code/streaming/")
 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
 
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
 
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
 
    running_counts.pprint()
 
    def dbfunc(records):
        db = pymysql.connect("localhost","root","pwd","sparkdb")
        cursor = db.cursor()
 
        def doinsert(p):
            sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
            try:
                cursor.execute(sql)                
                db.commit()
            except:                    
                db.rollback()
 
        for item in records:
            doinsert(item)
 
    def func(rdd):
        repartitionedRDD = rdd.repartition(3)
        repartitionedRDD.foreachPartition(dbfunc)
 
    running_counts.foreachRDD(func)
    ssc.start()
    ssc.awaitTermination()

保存退出vim编辑器。先执行一下程序,看看效果。
python DstreamToDB.py localhost 9999


执行上面命令以后,就进入监听状态。下面我们打开另外一个终端,运行nc:
nc -lk 9999
//现在你就可以在当前窗口内随意输入单词,输入一个单词就回车,比如输入下面单词
hello
hadoop
spark
hello
spark

在mysql命令提示符下输入查看命令,就可以查看spark.wordcount表内当前最新的数据:


图片.png

相关文章

网友评论

    本文标题:13.Spark学习(Python版本):DStream输出操作

    本文链接:https://www.haomeiwen.com/subject/zuxuwftx.html