在运行spark时,有时候需要更改一些配置,如excutor cpu数,内存大小等。可通过如下代码进行更改。
首先可以查看下当前sparkcontext的配置
# sc.getConf().getAll() 或下面的代码
spark.sparkContext._conf.getAll()
# 输出
[('spark.dynamicAllocation.enabled', 'false'),
('spark.yarn.appMasterEnv.JAVA_HOME', '/opt/jdk1.8.0_141/'),
('spark.driver.port', '37680'),
('spark.kryoserializer.buffer', '64k'),
('spark.hadoop.io.compression.codecs',
'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec'),
('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.RM_HA_URLS',
'pg-dmp-master1.hadoop:8088,pg-dmp-master2.hadoop:8088'),
('spark.app.name', 'JuPySparkHub'),
('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
('spark.python.worker.memory', '10g'),
('spark.driver.host', 'pg-dmp-python1'),
('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
'http://pg-dmp-master1.hadoop:8088/proxy/application_1593416689901_2911,http://pg-dmp-master2.hadoop:8088/proxy/application_1593416689901_2911'),
('spark.serializer.objectStreamReset', '100'),
('spark.kryoserializer.buffer.max', '256m'),
('spark.submit.deployMode', 'client'),
('spark.yarn.am.memory', '2g'),
('spark.yarn.dist.jars',
'file:///opt/spark-2.4.6-bin-hadoop2.6/jars/greenplum-spark_2.11-1.6.2.jar'),
('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
'pg-dmp-master1.hadoop,pg-dmp-master2.hadoop'),
('spark.ui.filters',
'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
('spark.executorEnv.JAVA_HOME', '/opt/jdk1.8.0_141/'),
('spark.shuffle.service.enabled', 'true'),
('spark.yarn.secondary.jars', 'greenplum-spark_2.11-1.6.2.jar'),
('spark.executor.memory', '10g'),
('spark.executor.id', 'driver'),
('spark.executor.cores', '2'),
('spark.master', 'yarn'),
('spark.sql.warehouse.dir', '/user/hive/warehouse'),
('spark.sql.catalogImplementation', 'hive'),
('spark.hadoop.io.compression.codec.lzo.class',
'com.hadoop.compression.lzo.LzoCodec'),
('spark.rdd.compress', 'True'),
('spark.ui.proxyBase', '/proxy/application_1593416689901_2884'),
('spark.app.id', 'application_1593416689901_2884'),
('spark.driver.appUIAddress', 'http://pg-dmp-python1:4049'),
('spark.repl.local.jars',
'file:///opt/spark-2.4.6-bin-hadoop2.6/jars/greenplum-spark_2.11-1.6.2.jar'),
('spark.yarn.isPython', 'true'),
('spark.executorEnv.PYTHONPATH',
'/opt/spark-2.4.6-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip:/opt/spark-2.4.6-bin-hadoop2.6/python/<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
('spark.ui.showConsoleProgress', 'true')]
更改某些项的配置,如不明确指定,则使用之前的默认配置:
from pyspark.sql import SparkSession
conf = spark.sparkContext._conf.setAll([('spark.dynamicAllocation.enabled', 'false'),
('spark.driver.memory', '7g')])
# 若是在zepplin环境中,stop这一步需要注释不要
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
# 查看更改后的配置
spark.sparkContext._conf.getAll()
[('spark.dynamicAllocation.enabled', 'false'),
('spark.yarn.appMasterEnv.JAVA_HOME', '/opt/jdk1.8.0_141/'),
('spark.driver.port', '37680'),
('spark.kryoserializer.buffer', '64k'),
('spark.hadoop.io.compression.codecs',
'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec'),
('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.RM_HA_URLS',
'pg-dmp-master1.hadoop:8088,pg-dmp-master2.hadoop:8088'),
('spark.app.name', 'JuPySparkHub'),
('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
('spark.python.worker.memory', '10g'),
('spark.driver.memory', '7g'),
('spark.driver.host', 'pg-dmp-python1'),
('spark.serializer.objectStreamReset', '100'),
('spark.kryoserializer.buffer.max', '256m'),
('spark.submit.deployMode', 'client'),
('spark.yarn.am.memory', '2g'),
('spark.yarn.dist.jars',
'file:///opt/spark-2.4.6-bin-hadoop2.6/jars/greenplum-spark_2.11-1.6.2.jar'),
('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
'pg-dmp-master1.hadoop,pg-dmp-master2.hadoop'),
('spark.ui.filters',
'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
('spark.executorEnv.JAVA_HOME', '/opt/jdk1.8.0_141/'),
('spark.shuffle.service.enabled', 'true'),
('spark.yarn.secondary.jars', 'greenplum-spark_2.11-1.6.2.jar'),
('spark.executor.memory', '10g'),
('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
'http://pg-dmp-master1.hadoop:8088/proxy/application_1593416689901_2936,http://pg-dmp-master2.hadoop:8088/proxy/application_1593416689901_2936'),
('spark.executor.id', 'driver'),
('spark.executor.cores', '2'),
('spark.master', 'yarn'),
('spark.sql.warehouse.dir', '/user/hive/warehouse'),
('spark.sql.catalogImplementation', 'hive'),
('spark.hadoop.io.compression.codec.lzo.class',
'com.hadoop.compression.lzo.LzoCodec'),
('spark.rdd.compress', 'True'),
('spark.ui.proxyBase', '/proxy/application_1593416689901_2884'),
('spark.app.id', 'application_1593416689901_2884'),
('spark.driver.appUIAddress', 'http://pg-dmp-python1:4049'),
('spark.repl.local.jars',
'file:///opt/spark-2.4.6-bin-hadoop2.6/jars/greenplum-spark_2.11-1.6.2.jar'),
('spark.yarn.isPython', 'true'),
('spark.executorEnv.PYTHONPATH',
'/opt/spark-2.4.6-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip:/opt/spark-2.4.6-bin-hadoop2.6/python/<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
('spark.ui.showConsoleProgress', 'true')]
若上述方式未能成功,可采用下面的方法
import pyspark
spark = pyspark.sql.SparkSession.builder \
.master("local[20]") \
.appName("Feature Engineering") \
.config("spark.driver.maxResultSize", "2g") \
.getOrCreate()
spark.sparkContext._conf.getAll()
参考:
https://stackoverflow.com/questions/41886346/spark-2-1-0-session-config-settings-pyspark
https://stackoverflow.com/questions/32362783/how-to-change-sparkcontext-properties-in-interactive-pyspark-session
网友评论