pyspark

作者: hehehehe | 来源:发表于2021-09-23 13:03 被阅读0次
JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home"
#JAVA_HOME="/Users/xxx/Downloads/soft/jdk-16.0.2.jdk/Contents/Home"
export JAVA_HOME
CLASS_PATH="$JAVA_HOME/lib"
PATH=".$PATH:$JAVA_HOME/bin"


SPARK_HOME="/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7"
export PATH=${PATH}:${SPARK_HOME}/bin
export PYTHONPATH=${SPARK_HOME}/python

pycharm
https://blog.csdn.net/ringsuling/article/details/84448369
在File-Settings中的project structure中点击右边的“add content root”,添加py4j-some-version.zip和pyspark.zip的路径(这两个文件都在Spark中的python文件夹下)

image.png

spark-submit --master yarn \
           --deploy-mode cluster\
           --executor-cores 4\
           --num-executors 4 \
           --executor-memory 20g \
           --driver-memory 20g \
           --conf spark.default.parallelism=148 \
           --conf spark.driver.maxResultSize=10G\
           --conf spark.speculation=true\
           --conf spark.speculation.quantile=0.9 \
           --conf spark.speculation.multiplier=1.5 \
           --conf spark.sql.hive.convertMetastoreParquet=false \
           --archives hdfs:///user/be/py3.zip/#py3 \
           --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./py3/bin/python3 \
            /data/sll/code/spark_test.py

spark-submit --master yarn \
           --deploy-mode cluster\
           --executor-cores 4\
           --num-executors 4 \
           --executor-memory 20g \
           --driver-memory 20g \
           --conf spark.default.parallelism=240 \
           --conf spark.driver.maxResultSize=10G\
           --conf spark.speculation=true\
           --conf spark.speculation.quantile=0.9 \
           --conf spark.speculation.multiplier=1.5 \
           --conf spark.sql.hive.convertMetastoreParquet=false \
           --archives hdfs:///user/be/py3.zip/#py3 \
           --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./py3/bin/python3 \
           --py-files hdfs:///user/be/auto/map-data-poi-match-spark.zip \
           /home/be/match/map-data-poi-match-spark/submit.py --input_diff_dt=$1 --app_name=$2

spark-submit --master yarn \
    --deploy-mode cluster \
    --driver-memory 20 \
    --num-executors 4 \
    --executor-cores 4 \
    --executor-memory 20g \
    --conf spark.dynamicAllocation.enabled=false \
    --conf spark.speculation=true \
    --conf spark.speculation.quantile=0.9 \
    --conf spark.speculation.multiplier=1.5 \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    --archives hdfs:///user/sll/venv.zip#python_dir \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
    --py-files /data/sll/code2/depend.zip /data/sll/code2/main.py

,prov_code ::numeric

export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3

spark-submit --master yarn \
    --deploy-mode cluster \
    --driver-memory 50g \
    --num-executors 4 \
    --executor-cores 4 \
    --executor-memory 50g \
    --name hnApp \
    --archives hdfs:///user/sll/venv.zip#python_dir \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
    --py-files /data/sll/code3/prpcrypt_util.py /data/sll/code3/hn_encrypt.py

import os
import sys
import json
from collections import OrderedDict
from prpcrypt_util import prpcrypt, key_0826, iv_0826
from pathlib import Path

import requests

proj_path = Path(__file__).parent
print(proj_path)
sys.path.insert(0, str(proj_path))
sys.path.insert(0, str(proj_path.joinpath("depend")))


def source_id_encrypt_func(hn_id, source_id):
    encrpt_sources = []
    if source_id:
        sources = source_id.split('|')
        for source in sources:
            if source and source.startswith('GD_'):
                encrpt_sources.append('A_' + encrpt_util.encrypt(source[3:]))
            elif source and source.startswith('BD_'):
                encrpt_sources.append('B_' + encrpt_util.encrypt(source[3:]))
            else:
                encrpt_sources.append(source)
    return hn_id, "|".join(encrpt_sources)


os.environ['SPARK_HOME'] = '/Users/xx/Downloads/soft/spark-3.0.3-bin-hadoop2.7'
os.environ['PYTHONPATH'] = '/Users/xx/Downloads/soft/spark-3.0.3-bin-hadoop2.7/python'
if __name__ == '__main__':
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, StructField, FloatType, TimestampType

    from_database = "poi_product"
    from_user = "xx"
    from_pw = "xx"
    from_host = "192.168.160.12"
    from_port = "15450"
    jdbcUrl = f"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
    connectionProperties = {
        "user": from_user,
        "password": from_pw,
        "driver": "org.postgresql.Driver"
    }

    spark = SparkSession.builder.appName('hnApp').getOrCreate()

    sql = "(select hn_id,source_id,prov_code ::numeric from poi_hn_edit_0826 limit 10) tmp"
    df = spark.read \
        .option('numPartitions', 30) \
        .option('partitionColumn', 'prov_code') \
        .option('lowerBound', '110000') \
        .option('upperBound', '650000') \
        .jdbc(url=jdbcUrl, table=sql, properties=connectionProperties)
    df.show(2)
    encrpt_util = prpcrypt(key_0826, iv_0826)

    rdd2 = df.rdd.map(lambda x: source_id_encrypt_func(x['hn_id'], x['source_id']))
    # print(rdd2.take(2))
    df2 = spark.createDataFrame(rdd2, ['hn_id', 'source_id'])
    df2.show(2)

    df2.write.jdbc(url=jdbcUrl, table='poi_hn_edit_0826_decrypt', mode='append', properties=connectionProperties)

    spark.stop()

相关文章

网友评论

      本文标题:pyspark

      本文链接:https://www.haomeiwen.com/subject/exxogltx.html