spark上传附件、加载py文件
1. 第一种方式: 在spark-submit中加载
spark-submit \
--queue xxx \
--archives ch_cut.zip#ch_cut \
--py-files label.py \
test.py
2. 第二种方式: 在py脚本中加载
sc.addFile("/user/data/py_module/normal", recursive=True) # 添加文件夹
spark加载自定义python环境
1. 打包anaconda环境
# 1. 创建虚拟环境
conda create -p ~/anaconda_test --copy -y -q python=2.7
# 2. 激活环境安装包
source activate /home/xiaoming/anaconda_test
# 3. 开始打包环境
zip -p -r anaconda_test.zip anaconda_test
2. spark-submit传入python环境路径,脚本中配置python环境
(1)spark-submit传入python环境路径
PYSPARK_DRIVER_PYTHON=~/anaconda_test/bin/python \
spark-submit \
--queue xxx \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./mypython/anaconda_test/bin/python \
-- conf spark.port.maxRetries=300 \
--archives anaconda_test.zip#anaconda_test \
test.py xxx xxx
(2)py脚本中配置Python环境
spark = SparkSession.builder \
.appName("test.py") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
sc.pythonExec = spark.conf.get("spark.yarn.appMasterEnv.PYSPARK_PYTHON")
spark-submit 常用参数配置
PYSPARK_DEIVER_PYTHON=~/anaconda_test/bin/python \
spark-submit \
--queue xxx \
--name test.py \
--deploy-mode client \
--master yarn \
--driver-memory 4g \
--executor-memory 12g \
--num-executors 100 \
--executor-cores 2 \
--archives ./anaconda_test.zip#mypython \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./mypython/anaconda_test/bin/python \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=200 \
--conf spark.default.parallelism=9600 \
--conf spark.port.maxRetries=100 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
test.py
参数说明:
-
--queue
:设置队列名称; -
--name
:设置application名称; -
--deploy-name
:driver进程部署模式,一般为client或cluster,默认为client; -
--master
:集群资源管理,一般设置为yarn; -
--driver-memory
:driver进程使用的内存; -
--executor-memory
:executor进程使用的内存; -
--num-executors
:设置spark作业总共要用多少个executor进程来执行; -
--executor-cores
:每个executor进程使用的cpu核数; -
--archives
:需要传送到executor的附件; -
spark.yarn.appMasterEnv.PYSPARK_PYTHON=xxx
:设置使用的Python环境; -
spark.dynamicAllocation.enabled=true
:是否动态分布资源; -
spark.default.parallelism=9600
:用于设置每个stage默认task线程数量; -
spark.port.maxRetries=100
:端口最大重试次数; -
spark.storage.memoryFraction=0.5
:设置rdd持久化数据在executor内存中占的比例,默认为0.6; -
spark.shuffle.memoryFraction=0.3
:设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor的内存比例;
Spark UI简介
Spark UI界面.png1. Jobs页面: 此页面可以看到当前spark启动了多少个job;
2. Stages页面: 此页面可以看到每个job,启动了多少个stage;job是根据spark任务中有多少个action操作得到的;每个job由多个stage组成,是根据shuffle操作数得到的;
3. Storage页面:所有代码中cache, persist等操作可以在这里看到,可以看到当前使用了多少缓存;
4. Environment页面:此页面展示了spark所依赖的环境,比如jdk,lib等;spark任务所设置的参数;
5. Executors页面:spark任务使用的资源汇总;
spark 查找异常数据技巧
- try...except... 将错误捕获,并在except中返回;
- 然后使用rdd.take(5),查看错误数据;
def get_json(line):
try:
item = json.loads(line)
return False
except:
return line
pyspark使用C++模块
current_dir = os.getcwd()
script_dir = os.path.split(os.path.realpath(__file__))[0] # 获取当前脚本路径
os.chdir(script_dir)
lib = CDLL("./xxx.so") # 加载so文件
os.chdir(current_dir)
yarn杀死spark任务
yarn application -kill application_1428487296152_25597
spark文件压缩存储
1. bzip2压缩格式: org.apache.hadoop.io.compress.BZip2Codec
;特点:压缩率最高,压缩解压速度慢,支持split;
2. snappy压缩格式: org.apache.hadoop.io.compress.SnappyCodec
;特点:json文本压缩率38.25%,压缩和解压时间短;
3. gzip压缩格式: org.apache.hadoop.io.compress.GzipCodec
; 特点:压缩率高,压缩和解压速度快,不支持split;json文本压缩率23.5%,适合使用率低,长期存储的文件;
sc.textFile读取多个目录
第一种方式:
sc.textFile("xxx,xxx,xxxx")
第二种方式:
from datetime import datetime
all_rdd = sc.parallelize([])
start_day = datetime.strptime("20200801", "%Y%m%d") # 开始时间
ndays = 10 # 往前多少天
for i in range(ndays):
now_day = start_day + datetime.timedelat(days=-i)
now_day_str = now_day.strftime("%Y%m%d")
input_path = os.path.join("/home/test", now_day_str)
rdd = sc.textFile(input_path)
all_rdd = all_rdd.union(rdd)
参考资料
- Spark Configuration - 官方文档参数说明
http://spark.apache.org/docs/latest/configuration.html - Spark性能优化指南——基础篇
- Spark性能优化指南——高级篇
- [看图说话] 基于Spark UI性能优化与调试——初级篇
https://cloud.tencent.com/developer/article/1021744 - spark & 文件压缩
https://blog.csdn.net/lsshlsw/article/details/51992569
网友评论