美文网首页
spark-submit

spark-submit

作者: hehehehe | 来源:发表于2021-09-25 00:23 被阅读0次
        print("cfg_file_base", Path(__file__).absolute())
        lists = os.listdir(".")
        print(lists)
        print(Path(__file__).parent.absolute())
        print(Path(__file__).parent.parent.absolute())
        cfg_file = Path(__file__).parent.parent.joinpath('cfg.yaml')
        print("cfg_file", cfg_file.absolute())
    
        cfg = yaml.load(Path('cfg.yaml').read_text(), Loader=yaml.FullLoader)
    
    cfg_file_base /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001/address_format_normal.py
    ['tmp',  'depend.zip', '__spark_conf__', 'cfg.yaml', 'python_dir', 'address_format_normal.py']
    /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001
    /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001
    cfg_file /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001/cfg.yaml
    
        cfg_file = Path('cfg.yaml')
        cfg = yaml.load(cfg_file.read_text(), Loader=yaml.FullLoader)
        print(cfg)
    
        spark = SparkSession.builder \
            .appName('appName') \
            .getOrCreate()
    
        file = open(cfg_file)
        readlines = file.readlines()
        for line in readlines:
            print(readlines)
    
        spark.stop()
    

    submit_depend.sh

    export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
    export PYSPARK_PYTHON=./python_dir/venv/bin/python3
    export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
    
    spark-submit --master yarn \
        --deploy-mode cluster \
        --driver-memory 30g \
        --num-executors 4 \
        --executor-cores 5 \
        --executor-memory 30g \
        --name hnApp \
        --archives hdfs:///user/sll/venv.zip#python_dir \
        --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.kryoserializer.buffer.max=256m \
        --conf spark.driver.maxResultSize=10g \
        --conf spark.sql.broadcastTimeout=600 \
        --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
        --files /data/sll/code/cfg.yaml \
        --py-files /data/sll/code/depend.zip \
        /data/sll/code/spark_test.py
                                  
    

    submit.sh

    export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
    export PYSPARK_PYTHON=./python_dir/venv/bin/python3
    export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
    
    spark-submit --master yarn \
        --deploy-mode cluster \
        --driver-memory 30g \
        --num-executors 5 \
        --executor-cores 4 \
        --executor-memory 30g \
        --name hnApp \
        --archives hdfs:///user/sll/venv.zip#python_dir \
        --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
        --conf spark.network.timeout=10000000 \
        --py-files /data/sll/code/prpcrypt_util.py,/data/sll/code/cfg.yaml \
         /data/sll/code/spark_test.py
    

    代码用到的文件存放于
    self.user_dict = './py3/venv/lib/python3.7/site-packages/userDict2.txt'

    zip -r venv.zip venv/
    hdfs dfs -put venv.zip /user/sll/
    zip -r -q depend.zip depend/
    
    True method

    conda create --name venv python=3.7
    zip -r venv.zip venv

    cd  anaconda3/
    cp -r bin ../venv/
    cd /root/anaconda3/lib
    cp -r python3.7 ../../venv/lib/
    
    zip -r venv.zip venv/
    hdfs dfs -put venv.zip /user/sll/
    
    
    =================================================================
    
    export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
    export PYSPARK_PYTHON=./python_dir/venv/bin/python3
    export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
    
    
    
    spark-submit --master yarn \
        --deploy-mode cluster \
        --driver-memory 30g \
        --num-executors 10 \
        --executor-cores 10 \
        --executor-memory 30g \
        --name hnApp \
        --jars /data/sll/code/jars/sedona-python-adapter-2.4_2.11-1.1.0-incubating.jar,/data/sll/code/jars/geotools-wrapper-geotools-24.1.jar \
        --archives hdfs:///user/sll/venv.zip#python_dir \
        --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.kryoserializer.buffer.max=256m \
        --conf spark.driver.maxResultSize=10g \
        --conf spark.sql.broadcastTimeout=600 \
        --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
        --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
        --files /data/sll/code/cfg.yaml \
        --py-files /data/sll/code/depend.zip \
        /data/sll/code/sedona_poi_admin2.py                                                                        
    
    
    import os
    import sys
    import json
    from collections import OrderedDict
    from pathlib import Path
    
    project_path = Path(__file__).parent
    print(project_path)
    sys.path.insert(0, str(project_path))
    sys.path.insert(0, str(project_path.joinpath('depend')))
    
    from depend.address_parser import QueryParser
    
    
    def address_split(address: str, queryParser):
        try:
            if address:
                return queryParser.recognise(address)['entities']
        except Exception as e:
            print("address_split:%s error:" % address)
        return None
    
    
    def address_extract(address):
        queryParser = QueryParser()
        split_address = address_split(address, queryParser)
        # print(split_address)
        if not split_address:
            print(f"address_split error:{address}")
            return "", ""
        format_address, level_dict, index_dict = dict_to_format_address(split_address)
        # print(format_address)
        gate = level_dict.get("12")[0] if level_dict.get("12") else None
        place_num = level_dict.get("26")[0] if level_dict.get("26") else None
        return gate, place_num
        # return 1, 1
    
    
    def dict_to_format_address(address_splits: list):
        if address_splits:
            address_splits_sorted = sorted(address_splits, key=lambda x: x['index'])
            format_address_list = []
            format_address_level_dict = OrderedDict()
            format_address_index_dict = OrderedDict()
            for address_split in address_splits_sorted:
                level = str(address_split['level'])
                format_address_list.append(f"{address_split['word']},{level}")
                format_address_index_dict[str(address_split['index'])] = (level, address_split['word'])
                if level in format_address_level_dict:
                    format_address_level_dict[level].append(address_split['word'])
                else:
                    format_address_level_dict[level] = [address_split['word']]
            return "|".join(format_address_list), format_address_level_dict, format_address_index_dict
    
    
    def gate_extract_tuple(hn_id, address):
        print(hn_id, address)
        gate, place_num = address_extract(address)
        return hn_id, gate, place_num
    
    def gate_extract_tuple(rows):
        queryParser = QueryParser()
        result = []
        for row in rows:
            gate, place_num = address_extract(row[1], queryParser)
            result.append([row[0], gate, place_num])
        return result
    
    os.environ['SPARK_HOME'] = '/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7'
    os.environ['PYTHONPATH'] = '/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7/python'
    if __name__ == '__main__':
        # print(gate_extract_tuple((12, "河南省焦作市武陟县S104")))
        from pyspark.sql import SparkSession
        from pyspark.sql.types import StructType, StringType, StructField
    
        from_database = "poi_product"
        from_user = "xxx"
        from_pw = "xxx"
        from_host = "192.168.160.12"
        from_port = "15450"
        jdbcUrl = f"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
        connectionProperties = {
            "user": from_user,
            "password": from_pw,
            "driver": "org.postgresql.Driver"
        }
    
        spark = SparkSession.builder.appName('hnApp').getOrCreate()
    
        sql = "(select hn_id,address from poi_hn_edit_0826 limit 20) tmp"
        df = spark.read.jdbc(url=jdbcUrl, table=sql, properties=connectionProperties)
        df.show(2)
    
        rdd2 = df.rdd.map(lambda e: gate_extract_tuple(e['hn_id'], e['address']))
        # rdd2 = df.rdd.mapPartitions(gate_extract_tuple)
        gate_schema = StructType([
            StructField("hn_id", StringType(), True),
            StructField("gate", StringType(), True),
            StructField("place_num", StringType(), True)
        ])
        print(rdd2.take(2))
        df2 = spark.createDataFrame(rdd2, gate_schema)
        df2.show(2)
    
        df2.write.jdbc(url=jdbcUrl, table='hn_gate_2', mode='append', properties=connectionProperties)
    
        spark.stop()
    
    

    相关文章

      网友评论

          本文标题:spark-submit

          本文链接:https://www.haomeiwen.com/subject/jnvtnltx.html