美文网首页
使用spark运行任务的方法

使用spark运行任务的方法

作者: Lyudmilalala | 来源:发表于2021-12-26 22:02 被阅读0次
  1. 调用pyspark终端
    安装好spark后,调用./spark-3.2.0-bin-hadoop3.2/bin/pyspark可以启动pyspark终端交互命令行
root@db3beeb11ec7:/home# ./spark-3.2.0-bin-hadoop3.2/bin/pyspark
Python 3.9.2 (default, Feb 28 2021, 17:03:44) 
[GCC 10.2.1 20210110] on linux
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/25 16:48:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/

Using Python version 3.9.2 (default, Feb 28 2021 17:03:44)
Spark context Web UI available at http://db3beeb11ec7:4040
Spark context available as 'sc' (master = local[*], app id = local-1640450903650).
SparkSession available as 'spark'.
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a",2), ("c",5)])
>>> print(sorted(rdd.groupByKey().mapValues(list).collect()))
[('a', [1, 1, 2]), ('b', [1]), ('c', [5])] 
>>> from pyspark.sql import Row
>>> from datetime import datetime, date
>>> df = spark.createDataFrame([
...     Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
...     Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
...     Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
... ])
>>> df.show()
+---+---+-------+----------+-------------------------------+
|  a  | b |     c.  |      d     |                    e                |
+---+---+-------+----------+-------------------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+--------------------------------+
  1. 使用spark命令调用脚本

可以通过命令行./bin/spark-submit <script_name>调用使用pyspark的脚本

运行脚本的话需要事先额外安装py4j依赖

pip3 install py4j

并且需要将SparkContext, SparkSession, SparkConf等的初始化挪到脚本中自己编写,比如编写如下spark-test1.py的脚本

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext("local")
spark = SparkSession.builder.getOrCreate()
# do simple operations
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a",2), ("c",5)])
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# init simple dataframes
from pyspark.sql import Row
from datetime import datetime, date
df = spark.createDataFrame([Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, >
df.show()

再使用命令行调用

./spark-3.2.0-bin-hadoop3.2/bin/spark-submit spark-test1.py

终端除结果外还会打印出许多spark的日志,比如:

21/12/26 13:54:20 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1427
21/12/26 13:54:20 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[5] at collect at /home/spark-test1.py:8) (first 15 tasks are for partitions Vector(0))
21/12/26 13:54:20 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
21/12/26 13:54:20 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (db3beeb11ec7, executor driver, partition 0, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
21/12/26 13:54:20 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
21/12/26 13:54:21 INFO ShuffleBlockFetcherIterator: Getting 1 (106.0 B) non-empty blocks including 1 (106.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
21/12/26 13:54:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 43 ms
21/12/26 13:54:21 INFO PythonRunner: Times: total = 47, boot = -1364, init = 1410, finish = 1
21/12/26 13:54:21 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1717 bytes result sent to driver
21/12/26 13:54:21 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 687 ms on db3beeb11ec7 (executor driver) (1/1)
21/12/26 13:54:21 INFO DAGScheduler: ResultStage 1 (collect at /home/spark-test1.py:8) finished in 0.719 s
21/12/26 13:54:21 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
21/12/26 13:54:21 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/12/26 13:54:21 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
21/12/26 13:54:21 INFO DAGScheduler: Job 0 finished: collect at /home/spark-test1.py:8, took 2.557911 s
[('a', [1, 1, 2]), ('b', [1]), ('c', [5])]
  1. 使用python直接运行脚本

调用前要确认已经配置spark到python的环境路径,否则会查找不到pyspark模块,并报如下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ModuleNotFoundError: No module named 'pyspark'

将以下配置添加到/etc/profile中:

export SPARK_HOME=/home/spark-3.2.0-bin-hadoop3.2
export PYTHONPATH=$SPARK_HOME/python
export PATH=$PATH:$SPARK_HOME/bin

使用python直接运行编写的脚本spark-test1.py

root@db3beeb11ec7:/home# python3 spark-test1.py 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/25 17:34:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[('a', [1, 1, 2]), ('b', [1]), ('c', [5])]                                      
+---+---+-------+----------+-------------------------------+
|  a  | b |     c.  |      d     |                    e                |
+---+---+-------+----------+-------------------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+--------------------------------+

相关文章

网友评论

      本文标题:使用spark运行任务的方法

      本文链接:https://www.haomeiwen.com/subject/uygjqrtx.html