环境打包
创建好
环境后,进入到环境所在的文件夹,例如你的环境是 ***/***/project_env, cd到project_env下,使用打包命令将当前目录下的文件打包
zip -r project_env.zip ./*
在当前文件夹下,将其上传至hdfs
hadoop fs -put ***/***/project_env/project_env.zip hdfs://***/***/***/env/
执行脚本
HOME_PATH=$(cd $(dirname $0);pwd)
xx/spark-2.2/bin/spark-submit \
--queue xxx \
--executor-memory 32G \
--packages com.databricks:spark-csv_2.10:1.5.0 \
--driver-memory 12G \
--master yarn-cluster \
--executor-cores 4 \
--num-executors 100 \
--name "xxxx" \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./project_env.zip/bin/python \
--conf spark.executorEnv.PYSPARK_PYTHON=./project_env.zip/bin/python \
--conf spark.yarn.appMasterEnv.LD_LIBRARY_PATH=/opt/rh/python27/root/usr/lib64 \
--conf spark.executorEnv.LD_LIBRARY_PATH=/opt/rh/python27/root/usr/lib64 \
--archives viewfs:///user/hadoop-nlpml/yuhang06/envs/spark_knn.zip \
$HOME_PATH/ann_pyspark.py ${version} ${strategy}
LD_LIBRARY_PATH
如果存在导入一些包的时候报libffi.so.6不存在,这里使用老的的Python环境中的这个依赖。
python demo
#coding=utf-8
from annoy import AnnoyIndex
import pickle
import re
import numpy as np
import json
import traceback
import sys
from pyspark.sql import SparkSession
from pyspark import SparkFiles
def do(iter):
def do_line(input):
try:
except:
traceback.print_exc()
return ""
return (do_line(x) for x in iter)
if __name__ == "__main__":
version = sys.argv[1]
strategy = sys.argv[2]
hadoop_path = ""
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5000)
spark.conf.set("spark.hadoopRDD.ignoreEmptySplits", True)
spark.conf.set("spark.hadoopRDD.targetBytesInPartition", 67108864)
sc = spark.sparkContext
sc.setLogLevel("INFO")
# 将hdfs的file 加入环境中 方便每台机器读取
sc.addFile(path1)
sc.addFile(path2)
result = sc.textFile(path3).repartition(500).mapPartitions(do)
result.saveAsTextFile(output_path)
网友评论