美文网首页python
2020-05-09-python-在yarn-cluster下

2020-05-09-python-在yarn-cluster下

作者: logi | 来源:发表于2020-05-09 18:59 被阅读0次

    环境打包

    创建好
    环境后,进入到环境所在的文件夹,例如你的环境是 ***/***/project_env, cd到project_env下,使用打包命令将当前目录下的文件打包
    
    zip -r project_env.zip ./*
    在当前文件夹下,将其上传至hdfs
    hadoop fs -put ***/***/project_env/project_env.zip hdfs://***/***/***/env/
    

    执行脚本

    HOME_PATH=$(cd $(dirname $0);pwd)
    
    
    xx/spark-2.2/bin/spark-submit     \
    --queue xxx \
    --executor-memory 32G \
    --packages com.databricks:spark-csv_2.10:1.5.0 \
    --driver-memory 12G \
    --master yarn-cluster \
    --executor-cores 4 \
    --num-executors 100 \
    --name "xxxx" \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./project_env.zip/bin/python \
    --conf spark.executorEnv.PYSPARK_PYTHON=./project_env.zip/bin/python \
    --conf spark.yarn.appMasterEnv.LD_LIBRARY_PATH=/opt/rh/python27/root/usr/lib64 \
    --conf spark.executorEnv.LD_LIBRARY_PATH=/opt/rh/python27/root/usr/lib64 \
    --archives viewfs:///user/hadoop-nlpml/yuhang06/envs/spark_knn.zip \
    $HOME_PATH/ann_pyspark.py ${version} ${strategy}
    
    

    LD_LIBRARY_PATH如果存在导入一些包的时候报libffi.so.6不存在,这里使用老的的Python环境中的这个依赖。

    python demo

    #coding=utf-8
    
    from annoy import AnnoyIndex
    import pickle
    import re
    import numpy as np
    import json
    import traceback
    import sys
    from pyspark.sql import SparkSession
    from pyspark import SparkFiles
    
    def do(iter):
            def do_line(input):
              try:
                
                 
              except:
                traceback.print_exc()
                return ""
         return (do_line(x) for x in iter)
    if __name__ == "__main__":
        version = sys.argv[1]
        strategy = sys.argv[2]
        hadoop_path = ""
      
        spark = SparkSession.builder.getOrCreate()
        spark.conf.set("spark.sql.shuffle.partitions", 5000)
        spark.conf.set("spark.hadoopRDD.ignoreEmptySplits", True)
        spark.conf.set("spark.hadoopRDD.targetBytesInPartition", 67108864)
        sc = spark.sparkContext
        sc.setLogLevel("INFO")
    
        # 将hdfs的file 加入环境中 方便每台机器读取
        sc.addFile(path1)
        sc.addFile(path2)
    
        result = sc.textFile(path3).repartition(500).mapPartitions(do)
        result.saveAsTextFile(output_path)
    

    相关文章

      网友评论

        本文标题:2020-05-09-python-在yarn-cluster下

        本文链接:https://www.haomeiwen.com/subject/yshlnhtx.html