美文网首页
Spark基础使用、配置总结

Spark基础使用、配置总结

作者: nlpming | 来源:发表于2020-09-06 18:06 被阅读0次

    spark上传附件、加载py文件

    1. 第一种方式: 在spark-submit中加载

    spark-submit \
      --queue xxx \
      --archives ch_cut.zip#ch_cut \
      --py-files label.py \
      test.py
    

    2. 第二种方式: 在py脚本中加载

    sc.addFile("/user/data/py_module/normal", recursive=True)  # 添加文件夹
    

    spark加载自定义python环境

    1. 打包anaconda环境

    # 1. 创建虚拟环境
    conda create -p ~/anaconda_test --copy -y -q python=2.7
    
    # 2. 激活环境安装包
    source activate /home/xiaoming/anaconda_test
    
    # 3. 开始打包环境
    zip -p -r anaconda_test.zip anaconda_test
    

    2. spark-submit传入python环境路径,脚本中配置python环境

    (1)spark-submit传入python环境路径

    PYSPARK_DRIVER_PYTHON=~/anaconda_test/bin/python \
    spark-submit \
      --queue xxx \
      --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./mypython/anaconda_test/bin/python \
      -- conf spark.port.maxRetries=300 \
      --archives anaconda_test.zip#anaconda_test \
      test.py xxx xxx
    

    (2)py脚本中配置Python环境

    spark = SparkSession.builder \
      .appName("test.py") \
      .enableHiveSupport() \
      .getOrCreate()
    sc = spark.sparkContext
    sc.pythonExec = spark.conf.get("spark.yarn.appMasterEnv.PYSPARK_PYTHON")
    

    spark-submit 常用参数配置

    PYSPARK_DEIVER_PYTHON=~/anaconda_test/bin/python \
    spark-submit \
      --queue xxx \
      --name test.py \
      --deploy-mode client \
      --master yarn \
      --driver-memory 4g \
      --executor-memory 12g \
      --num-executors 100 \
      --executor-cores 2 \
      --archives ./anaconda_test.zip#mypython \ 
      --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./mypython/anaconda_test/bin/python \
      --conf spark.dynamicAllocation.enabled=true \
      --conf spark.dynamicAllocation.maxExecutors=200 \
      --conf spark.default.parallelism=9600 \
      --conf spark.port.maxRetries=100 \
      --conf spark.storage.memoryFraction=0.5 \
      --conf spark.shuffle.memoryFraction=0.3 \
      test.py
    

    参数说明:

    • --queue:设置队列名称;
    • --name:设置application名称;
    • --deploy-name:driver进程部署模式,一般为client或cluster,默认为client;
    • --master:集群资源管理,一般设置为yarn;
    • --driver-memory:driver进程使用的内存;
    • --executor-memory:executor进程使用的内存;
    • --num-executors:设置spark作业总共要用多少个executor进程来执行;
    • --executor-cores:每个executor进程使用的cpu核数;
    • --archives:需要传送到executor的附件;
    • spark.yarn.appMasterEnv.PYSPARK_PYTHON=xxx:设置使用的Python环境;
    • spark.dynamicAllocation.enabled=true:是否动态分布资源;
    • spark.default.parallelism=9600:用于设置每个stage默认task线程数量;
    • spark.port.maxRetries=100:端口最大重试次数;
    • spark.storage.memoryFraction=0.5:设置rdd持久化数据在executor内存中占的比例,默认为0.6;
    • spark.shuffle.memoryFraction=0.3:设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor的内存比例;

    Spark UI简介

    Spark UI界面.png

    1. Jobs页面: 此页面可以看到当前spark启动了多少个job;
    2. Stages页面: 此页面可以看到每个job,启动了多少个stage;job是根据spark任务中有多少个action操作得到的;每个job由多个stage组成,是根据shuffle操作数得到的;
    3. Storage页面:所有代码中cache, persist等操作可以在这里看到,可以看到当前使用了多少缓存;
    4. Environment页面:此页面展示了spark所依赖的环境,比如jdk,lib等;spark任务所设置的参数;
    5. Executors页面:spark任务使用的资源汇总;

    spark 查找异常数据技巧

    • try...except... 将错误捕获,并在except中返回;
    • 然后使用rdd.take(5),查看错误数据;
    def get_json(line):
      try:
        item = json.loads(line)
        return False
      except:
        return line
    

    pyspark使用C++模块

    current_dir = os.getcwd()
    script_dir = os.path.split(os.path.realpath(__file__))[0]  # 获取当前脚本路径
    os.chdir(script_dir)
    
    lib = CDLL("./xxx.so")  # 加载so文件
    
    os.chdir(current_dir)
    

    yarn杀死spark任务

    yarn application -kill application_1428487296152_25597
    

    spark文件压缩存储

    1. bzip2压缩格式: org.apache.hadoop.io.compress.BZip2Codec;特点:压缩率最高,压缩解压速度慢,支持split;
    2. snappy压缩格式: org.apache.hadoop.io.compress.SnappyCodec;特点:json文本压缩率38.25%,压缩和解压时间短;
    3. gzip压缩格式: org.apache.hadoop.io.compress.GzipCodec; 特点:压缩率高,压缩和解压速度快,不支持split;json文本压缩率23.5%,适合使用率低,长期存储的文件;

    sc.textFile读取多个目录

    第一种方式:
    sc.textFile("xxx,xxx,xxxx")
    
    第二种方式:
    from datetime import datetime
    
    all_rdd = sc.parallelize([])
    start_day = datetime.strptime("20200801", "%Y%m%d") # 开始时间
    ndays = 10 # 往前多少天
    
    for i in range(ndays):
      now_day = start_day + datetime.timedelat(days=-i)
      now_day_str = now_day.strftime("%Y%m%d")
      input_path = os.path.join("/home/test", now_day_str)
      rdd = sc.textFile(input_path)
      all_rdd = all_rdd.union(rdd)
    

    参考资料

    相关文章

      网友评论

          本文标题:Spark基础使用、配置总结

          本文链接:https://www.haomeiwen.com/subject/rlhisktx.html