pyspark

作者: hehehehe | 来源:发表于2021-09-23 13:03 被阅读0次
    JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home"
    #JAVA_HOME="/Users/xxx/Downloads/soft/jdk-16.0.2.jdk/Contents/Home"
    export JAVA_HOME
    CLASS_PATH="$JAVA_HOME/lib"
    PATH=".$PATH:$JAVA_HOME/bin"
    
    
    SPARK_HOME="/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7"
    export PATH=${PATH}:${SPARK_HOME}/bin
    export PYTHONPATH=${SPARK_HOME}/python
    

    pycharm
    https://blog.csdn.net/ringsuling/article/details/84448369
    在File-Settings中的project structure中点击右边的“add content root”,添加py4j-some-version.zip和pyspark.zip的路径(这两个文件都在Spark中的python文件夹下)

    image.png
    
    spark-submit --master yarn \
               --deploy-mode cluster\
               --executor-cores 4\
               --num-executors 4 \
               --executor-memory 20g \
               --driver-memory 20g \
               --conf spark.default.parallelism=148 \
               --conf spark.driver.maxResultSize=10G\
               --conf spark.speculation=true\
               --conf spark.speculation.quantile=0.9 \
               --conf spark.speculation.multiplier=1.5 \
               --conf spark.sql.hive.convertMetastoreParquet=false \
               --archives hdfs:///user/be/py3.zip/#py3 \
               --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./py3/bin/python3 \
                /data/sll/code/spark_test.py
    
    
    spark-submit --master yarn \
               --deploy-mode cluster\
               --executor-cores 4\
               --num-executors 4 \
               --executor-memory 20g \
               --driver-memory 20g \
               --conf spark.default.parallelism=240 \
               --conf spark.driver.maxResultSize=10G\
               --conf spark.speculation=true\
               --conf spark.speculation.quantile=0.9 \
               --conf spark.speculation.multiplier=1.5 \
               --conf spark.sql.hive.convertMetastoreParquet=false \
               --archives hdfs:///user/be/py3.zip/#py3 \
               --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./py3/bin/python3 \
               --py-files hdfs:///user/be/auto/map-data-poi-match-spark.zip \
               /home/be/match/map-data-poi-match-spark/submit.py --input_diff_dt=$1 --app_name=$2
    
    
    spark-submit --master yarn \
        --deploy-mode cluster \
        --driver-memory 20 \
        --num-executors 4 \
        --executor-cores 4 \
        --executor-memory 20g \
        --conf spark.dynamicAllocation.enabled=false \
        --conf spark.speculation=true \
        --conf spark.speculation.quantile=0.9 \
        --conf spark.speculation.multiplier=1.5 \
        --conf spark.sql.hive.convertMetastoreParquet=false \
        --archives hdfs:///user/sll/venv.zip#python_dir \
        --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
        --py-files /data/sll/code2/depend.zip /data/sll/code2/main.py
    

    ,prov_code ::numeric

    export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
    export PYSPARK_PYTHON=./python_dir/venv/bin/python3
    export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
    
    spark-submit --master yarn \
        --deploy-mode cluster \
        --driver-memory 50g \
        --num-executors 4 \
        --executor-cores 4 \
        --executor-memory 50g \
        --name hnApp \
        --archives hdfs:///user/sll/venv.zip#python_dir \
        --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
        --py-files /data/sll/code3/prpcrypt_util.py /data/sll/code3/hn_encrypt.py
    
    import os
    import sys
    import json
    from collections import OrderedDict
    from prpcrypt_util import prpcrypt, key_0826, iv_0826
    from pathlib import Path
    
    import requests
    
    proj_path = Path(__file__).parent
    print(proj_path)
    sys.path.insert(0, str(proj_path))
    sys.path.insert(0, str(proj_path.joinpath("depend")))
    
    
    def source_id_encrypt_func(hn_id, source_id):
        encrpt_sources = []
        if source_id:
            sources = source_id.split('|')
            for source in sources:
                if source and source.startswith('GD_'):
                    encrpt_sources.append('A_' + encrpt_util.encrypt(source[3:]))
                elif source and source.startswith('BD_'):
                    encrpt_sources.append('B_' + encrpt_util.encrypt(source[3:]))
                else:
                    encrpt_sources.append(source)
        return hn_id, "|".join(encrpt_sources)
    
    
    os.environ['SPARK_HOME'] = '/Users/xx/Downloads/soft/spark-3.0.3-bin-hadoop2.7'
    os.environ['PYTHONPATH'] = '/Users/xx/Downloads/soft/spark-3.0.3-bin-hadoop2.7/python'
    if __name__ == '__main__':
        from pyspark.sql import SparkSession
        from pyspark.sql.types import StructType, StringType, StructField, FloatType, TimestampType
    
        from_database = "poi_product"
        from_user = "xx"
        from_pw = "xx"
        from_host = "192.168.160.12"
        from_port = "15450"
        jdbcUrl = f"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
        connectionProperties = {
            "user": from_user,
            "password": from_pw,
            "driver": "org.postgresql.Driver"
        }
    
        spark = SparkSession.builder.appName('hnApp').getOrCreate()
    
        sql = "(select hn_id,source_id,prov_code ::numeric from poi_hn_edit_0826 limit 10) tmp"
        df = spark.read \
            .option('numPartitions', 30) \
            .option('partitionColumn', 'prov_code') \
            .option('lowerBound', '110000') \
            .option('upperBound', '650000') \
            .jdbc(url=jdbcUrl, table=sql, properties=connectionProperties)
        df.show(2)
        encrpt_util = prpcrypt(key_0826, iv_0826)
    
        rdd2 = df.rdd.map(lambda x: source_id_encrypt_func(x['hn_id'], x['source_id']))
        # print(rdd2.take(2))
        df2 = spark.createDataFrame(rdd2, ['hn_id', 'source_id'])
        df2.show(2)
    
        df2.write.jdbc(url=jdbcUrl, table='poi_hn_edit_0826_decrypt', mode='append', properties=connectionProperties)
    
        spark.stop()
    
    

    相关文章

      网友评论

          本文标题:pyspark

          本文链接:https://www.haomeiwen.com/subject/exxogltx.html