美文网首页
05-pySpark 集群测试

05-pySpark 集群测试

作者: 过桥 | 来源:发表于2019-11-15 09:22 被阅读0次

Windows 单机测试

本地环境

IDE:Sublime Text
Python 3.5
Spark:spark-2.4.4-bin-hadoop2.7

编写测试脚本

import sys
from operator import add
from pyspark.sql import SparkSession
 
if __name__ == "__main__":
 
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()
    lines = spark.read.text('file:///E:/C_Code/learngit/Python3_Test/Test/PySpark/文章生成器.txt').rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))
 
    spark.stop()

异常、 ImportError: No module named 'pyspark'

pyspark 路径无法识别

# Path for spark source folder
os.environ['SPARK_HOME'] = "D:\S_Software\spark-2.4.4-bin-hadoop2.7"
# Append pyspark to Python Path
sys.path.append("D:\S_Software\spark-2.4.4-bin-hadoop2.7\python")
异常、 ImportError: No module named 'py4j'

py4j 路径无法识别

sys.path.append("D:\S_Software\spark-2.4.4-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip")

参考链接

异常 'gbk' codec can't encode character '\ufffd' in position 0

spark.read.text 读取文件默认UTF-8Windows记事本文件默认ANSI,另存为UTF-8即可

修复后代码

import sys
from operator import add

import os

# Path for spark source folder
os.environ['SPARK_HOME'] = "D:\S_Software\spark-2.4.4-bin-hadoop2.7"
# Append pyspark to Python Path
sys.path.append("D:\S_Software\spark-2.4.4-bin-hadoop2.7\python")
sys.path.append("D:\S_Software\spark-2.4.4-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip")
from pyspark.sql import SparkSession
 
if __name__ == "__main__":
 
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()
    lines = spark.read.text('file:///E:/C_Code/learngit/Python3_Test/Test/PySpark/文章生成器.txt').rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
    output = counts.collect()
    with open("文章词汇统计.txt","w") as f:
        for (word, count) in output:
            f.writelines(word + ":" + str(count)+'\n')

    spark.stop()

虚拟机集群测试

本地环境

Python 3.6
Spark:spark-2.4.4-bin-hadoop2.7
Spark目录:/home/spark/software/spark-2.4.4-bin-hadoop2.7/
测试文件目录:/home/spark/test
Master节点:mongodb01
计算节点:mongodb01、mongodb02、mongodb03

启动Spark计算集群

[mongodb@mongodb01 bin]$ cd /home/spark/software/spark-2.4.4-bin-hadoop2.7/sbin/
[mongodb@mongodb01 sbin]$ sudo ./start-all.sh

上传样例数据至服务器

# 新建文件夹
[mongodb@mongodb01 sbin]$ sudo mkdir -p /home/spark/test
[mongodb@mongodb01 sbin]$ cd /home/spark/test
# 上传样例数据至服务器
[mongodb@mongodb01 sbin]$ sudo rz

[mongodb@mongodb01 test]$ ll
总用量 32
-rw-r--r--. 1 root root 31237 11月 14 15:34 文章生成器.txt

异常、直接使用 rz 上传失败

可能是没有写入权限,使用 sudo 提升权限上传

修改本地代码

from operator import add
from pyspark.sql import SparkSession
 
if __name__ == "__main__":
 
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()
    lines = spark.read.text('file:///home/spark/test/文章生成器.txt').rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
    output = counts.collect()
    with open("/home/spark/test/文章词汇统计.txt","w") as f:
        for (word, count) in output:
            f.writelines(word + ":" + str(count)+'\n')

    spark.stop()

上传程序代码至服务器

[mongodb@mongodb01 test]$ sudo rz

[mongodb@mongodb01 test]$ ll
总用量 36
-rw-r--r--. 1 root root   544 11月 14 16:59 spark.read_server.py
-rw-r--r--. 1 root root 31237 11月 14 15:34 文章生成器.txt
[mongodb@mongodb01 test]$ 

单机执行

[mongodb@mongodb01 test]$ /home/spark/software/spark-2.4.4-bin-hadoop2.7/bin/spark-submit ./spark.read_server.py

可观察log执行进度及结果

异常、执行过程中提示写文件失败
Traceback (most recent call last):
  File "/home/spark/test/./spark.read_server.py", line 10, in <module>
    with open("/home/spark/test/文章词汇统计.txt","w") as f:
PermissionError: [Errno 13] Permission denied: '/home/spark/test/文章词汇统计.txt'

修复方法,修改文件夹权限

[mongodb@mongodb01 test]$ sudo chmod -R 777 /home/spark/test

提交集群执行

[mongodb@mongodb01 test]$ /home/spark/software/spark-2.4.4-bin-hadoop2.7/bin/spark-submit \
--master spark://mongodb01:7077 \
--executor-memory 512M \
--name PythonWordCount \
./spark.read_server.py

可观察log与页面http://192.168.153.128:8080执行进度及结果,可能因分配内存较小,执行10分钟才完成任务,后续再考虑部署优化方式。

异常、一直执行,提示检查内存
19/11/14 17:50:31 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
19/11/14 17:50:46 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

修复方法,命令中设置--executor-memory 512M \,内存需小于初始分配内存800M

相关文章

  • 05-pySpark 集群测试

    Windows 单机测试 本地环境 编写测试脚本 异常、 ImportError: No module named...

  • HBase集群平滑迁移步骤

    HBase集群平滑迁移步骤 测试环境 CDH版本HBASE版本测试源集群5.15.11.2.0测试目标集群6.2....

  • k8s服务的暴露方式及区别

    环境:阿里云已经建好k8s 测试集群1,测试集群2测试集群1对应的服务器:A和B测试集群2对应的服务器:C和D 1...

  • kubemark 搭建测试集群和性能测试

    原文链接:kubemark 搭建测试集群 kubemark 搭建测试集群和性能测试 Kubemark是K8s官方提...

  • 【Greenplum】TPC测试指南

    【Greenplum】TPC性能测试指南 1、准备工作 1.1、Greenplum集群 集群版本集群规格集群节点数...

  • 【Greenplum】TPC测试指南

    【Greenplum】TPC性能测试指南 1、准备工作 1.1、Greenplum集群 集群版本集群规格集群节点数...

  • Windows的Jmeter分布式集群压力测试

    Windows和Linux的Jmeter分布式集群压力测试 Windows的Jmeter分布式集群压力测试 在使用...

  • Greenplum安装

    一、环境准备 二、集群安装 三、集群初始化 四、简单测试

  • Redis 4.0集群配置

    Redis 集群,官方方案需要6个节点,3个主3个从。 安装依赖软件 安装redis 集群配置 创建集群 集群测试...

  • 异常Kafka'修复'记

    近期由于测试Flink安全模式,找到一个安全的hadoop&kafka集群,测试过程中发现kafka集群各种异常(...

网友评论

      本文标题:05-pySpark 集群测试

      本文链接:https://www.haomeiwen.com/subject/kljjictx.html