print("cfg_file_base", Path(__file__).absolute())
lists = os.listdir(".")
print(lists)
print(Path(__file__).parent.absolute())
print(Path(__file__).parent.parent.absolute())
cfg_file = Path(__file__).parent.parent.joinpath('cfg.yaml')
print("cfg_file", cfg_file.absolute())
cfg = yaml.load(Path('cfg.yaml').read_text(), Loader=yaml.FullLoader)
cfg_file_base /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001/address_format_normal.py
['tmp', 'depend.zip', '__spark_conf__', 'cfg.yaml', 'python_dir', 'address_format_normal.py']
/data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001
/data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001
cfg_file /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001/cfg.yaml
cfg_file = Path('cfg.yaml')
cfg = yaml.load(cfg_file.read_text(), Loader=yaml.FullLoader)
print(cfg)
spark = SparkSession.builder \
.appName('appName') \
.getOrCreate()
file = open(cfg_file)
readlines = file.readlines()
for line in readlines:
print(readlines)
spark.stop()
submit_depend.sh
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
spark-submit --master yarn \
--deploy-mode cluster \
--driver-memory 30g \
--num-executors 4 \
--executor-cores 5 \
--executor-memory 30g \
--name hnApp \
--archives hdfs:///user/sll/venv.zip#python_dir \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.driver.maxResultSize=10g \
--conf spark.sql.broadcastTimeout=600 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
--files /data/sll/code/cfg.yaml \
--py-files /data/sll/code/depend.zip \
/data/sll/code/spark_test.py
submit.sh
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
spark-submit --master yarn \
--deploy-mode cluster \
--driver-memory 30g \
--num-executors 5 \
--executor-cores 4 \
--executor-memory 30g \
--name hnApp \
--archives hdfs:///user/sll/venv.zip#python_dir \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
--conf spark.network.timeout=10000000 \
--py-files /data/sll/code/prpcrypt_util.py,/data/sll/code/cfg.yaml \
/data/sll/code/spark_test.py
代码用到的文件存放于
self.user_dict = './py3/venv/lib/python3.7/site-packages/userDict2.txt'
zip -r venv.zip venv/
hdfs dfs -put venv.zip /user/sll/
zip -r -q depend.zip depend/
True method
conda create --name venv python=3.7
zip -r venv.zip venv
cd anaconda3/
cp -r bin ../venv/
cd /root/anaconda3/lib
cp -r python3.7 ../../venv/lib/
zip -r venv.zip venv/
hdfs dfs -put venv.zip /user/sll/
=================================================================
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3
spark-submit --master yarn \
--deploy-mode cluster \
--driver-memory 30g \
--num-executors 10 \
--executor-cores 10 \
--executor-memory 30g \
--name hnApp \
--jars /data/sll/code/jars/sedona-python-adapter-2.4_2.11-1.1.0-incubating.jar,/data/sll/code/jars/geotools-wrapper-geotools-24.1.jar \
--archives hdfs:///user/sll/venv.zip#python_dir \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.driver.maxResultSize=10g \
--conf spark.sql.broadcastTimeout=600 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
--conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
--files /data/sll/code/cfg.yaml \
--py-files /data/sll/code/depend.zip \
/data/sll/code/sedona_poi_admin2.py
import os
import sys
import json
from collections import OrderedDict
from pathlib import Path
project_path = Path(__file__).parent
print(project_path)
sys.path.insert(0, str(project_path))
sys.path.insert(0, str(project_path.joinpath('depend')))
from depend.address_parser import QueryParser
def address_split(address: str, queryParser):
try:
if address:
return queryParser.recognise(address)['entities']
except Exception as e:
print("address_split:%s error:" % address)
return None
def address_extract(address):
queryParser = QueryParser()
split_address = address_split(address, queryParser)
# print(split_address)
if not split_address:
print(f"address_split error:{address}")
return "", ""
format_address, level_dict, index_dict = dict_to_format_address(split_address)
# print(format_address)
gate = level_dict.get("12")[0] if level_dict.get("12") else None
place_num = level_dict.get("26")[0] if level_dict.get("26") else None
return gate, place_num
# return 1, 1
def dict_to_format_address(address_splits: list):
if address_splits:
address_splits_sorted = sorted(address_splits, key=lambda x: x['index'])
format_address_list = []
format_address_level_dict = OrderedDict()
format_address_index_dict = OrderedDict()
for address_split in address_splits_sorted:
level = str(address_split['level'])
format_address_list.append(f"{address_split['word']},{level}")
format_address_index_dict[str(address_split['index'])] = (level, address_split['word'])
if level in format_address_level_dict:
format_address_level_dict[level].append(address_split['word'])
else:
format_address_level_dict[level] = [address_split['word']]
return "|".join(format_address_list), format_address_level_dict, format_address_index_dict
def gate_extract_tuple(hn_id, address):
print(hn_id, address)
gate, place_num = address_extract(address)
return hn_id, gate, place_num
def gate_extract_tuple(rows):
queryParser = QueryParser()
result = []
for row in rows:
gate, place_num = address_extract(row[1], queryParser)
result.append([row[0], gate, place_num])
return result
os.environ['SPARK_HOME'] = '/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7'
os.environ['PYTHONPATH'] = '/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7/python'
if __name__ == '__main__':
# print(gate_extract_tuple((12, "河南省焦作市武陟县S104")))
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField
from_database = "poi_product"
from_user = "xxx"
from_pw = "xxx"
from_host = "192.168.160.12"
from_port = "15450"
jdbcUrl = f"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
connectionProperties = {
"user": from_user,
"password": from_pw,
"driver": "org.postgresql.Driver"
}
spark = SparkSession.builder.appName('hnApp').getOrCreate()
sql = "(select hn_id,address from poi_hn_edit_0826 limit 20) tmp"
df = spark.read.jdbc(url=jdbcUrl, table=sql, properties=connectionProperties)
df.show(2)
rdd2 = df.rdd.map(lambda e: gate_extract_tuple(e['hn_id'], e['address']))
# rdd2 = df.rdd.mapPartitions(gate_extract_tuple)
gate_schema = StructType([
StructField("hn_id", StringType(), True),
StructField("gate", StringType(), True),
StructField("place_num", StringType(), True)
])
print(rdd2.take(2))
df2 = spark.createDataFrame(rdd2, gate_schema)
df2.show(2)
df2.write.jdbc(url=jdbcUrl, table='hn_gate_2', mode='append', properties=connectionProperties)
spark.stop()
网友评论