JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home"
#JAVA_HOME="/Users/xxx/Downloads/soft/jdk-16.0.2.jdk/Contents/Home"
export JAVA_HOME
CLASS_PATH="$JAVA_HOME/lib"
PATH=".$PATH:$JAVA_HOME/bin"
SPARK_HOME="/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7"
export PATH=${PATH}:${SPARK_HOME}/bin
export PYTHONPATH=${SPARK_HOME}/python
pycharm
https://blog.csdn.net/ringsuling/article/details/84448369
在File-Settings中的project structure中点击右边的“add content root”,添加py4j-some-version.zip和pyspark.zip的路径(这两个文件都在Spark中的python文件夹下)
spark-submit --master yarn \
--deploy-mode cluster\
--executor-cores 4\
--num-executors 4 \
--executor-memory 20g \
--driver-memory 20g \
--conf spark.default.parallelism=148 \
--conf spark.driver.maxResultSize=10G\
--conf spark.speculation=true\
--conf spark.speculation.quantile=0.9 \
--conf spark.speculation.multiplier=1.5 \
--conf spark.sql.hive.convertMetastoreParquet=false \
--archives hdfs:///user/be/py3.zip/#py3 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./py3/bin/python3 \
/data/sll/code/spark_test.py
spark-submit --master yarn \
--deploy-mode cluster\
--executor-cores 4\
--num-executors 4 \
--executor-memory 20g \
--driver-memory 20g \
--conf spark.default.parallelism=240 \
--conf spark.driver.maxResultSize=10G\
--conf spark.speculation=true\
--conf spark.speculation.quantile=0.9 \
--conf spark.speculation.multiplier=1.5 \
--conf spark.sql.hive.convertMetastoreParquet=false \
--archives hdfs:///user/be/py3.zip/#py3 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./py3/bin/python3 \
--py-files hdfs:///user/be/auto/map-data-poi-match-spark.zip \
/home/be/match/map-data-poi-match-spark/submit.py --input_diff_dt=$1 --app_name=$2
spark-submit --master yarn \
--deploy-mode cluster \
--driver-memory 20 \
--num-executors 4 \
--executor-cores 4 \
--executor-memory 20g \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.speculation=true \
--conf spark.speculation.quantile=0.9 \
--conf spark.speculation.multiplier=1.5 \
--conf spark.sql.hive.convertMetastoreParquet=false \
--archives hdfs:///user/sll/venv.zip#python_dir \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
--py-files /data/sll/code2/depend.zip /data/sll/code2/main.py
,prov_code ::numeric
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
spark-submit --master yarn \
--deploy-mode cluster \
--driver-memory 50g \
--num-executors 4 \
--executor-cores 4 \
--executor-memory 50g \
--name hnApp \
--archives hdfs:///user/sll/venv.zip#python_dir \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
--py-files /data/sll/code3/prpcrypt_util.py /data/sll/code3/hn_encrypt.py
import os
import sys
import json
from collections import OrderedDict
from prpcrypt_util import prpcrypt, key_0826, iv_0826
from pathlib import Path
import requests
proj_path = Path(__file__).parent
print(proj_path)
sys.path.insert(0, str(proj_path))
sys.path.insert(0, str(proj_path.joinpath("depend")))
def source_id_encrypt_func(hn_id, source_id):
encrpt_sources = []
if source_id:
sources = source_id.split('|')
for source in sources:
if source and source.startswith('GD_'):
encrpt_sources.append('A_' + encrpt_util.encrypt(source[3:]))
elif source and source.startswith('BD_'):
encrpt_sources.append('B_' + encrpt_util.encrypt(source[3:]))
else:
encrpt_sources.append(source)
return hn_id, "|".join(encrpt_sources)
os.environ['SPARK_HOME'] = '/Users/xx/Downloads/soft/spark-3.0.3-bin-hadoop2.7'
os.environ['PYTHONPATH'] = '/Users/xx/Downloads/soft/spark-3.0.3-bin-hadoop2.7/python'
if __name__ == '__main__':
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField, FloatType, TimestampType
from_database = "poi_product"
from_user = "xx"
from_pw = "xx"
from_host = "192.168.160.12"
from_port = "15450"
jdbcUrl = f"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
connectionProperties = {
"user": from_user,
"password": from_pw,
"driver": "org.postgresql.Driver"
}
spark = SparkSession.builder.appName('hnApp').getOrCreate()
sql = "(select hn_id,source_id,prov_code ::numeric from poi_hn_edit_0826 limit 10) tmp"
df = spark.read \
.option('numPartitions', 30) \
.option('partitionColumn', 'prov_code') \
.option('lowerBound', '110000') \
.option('upperBound', '650000') \
.jdbc(url=jdbcUrl, table=sql, properties=connectionProperties)
df.show(2)
encrpt_util = prpcrypt(key_0826, iv_0826)
rdd2 = df.rdd.map(lambda x: source_id_encrypt_func(x['hn_id'], x['source_id']))
# print(rdd2.take(2))
df2 = spark.createDataFrame(rdd2, ['hn_id', 'source_id'])
df2.show(2)
df2.write.jdbc(url=jdbcUrl, table='poi_hn_edit_0826_decrypt', mode='append', properties=connectionProperties)
spark.stop()
网友评论