美文网首页pyspark学习
11 打包Spark应用程序

11 打包Spark应用程序

作者: 7125messi | 来源:发表于2018-01-20 13:37 被阅读253次

    测试时,我们使用jupyter notebook可以方便开发调试程序。但是,当我们实际生产过程中,需要执行一个计划作业时,该作业每小时都在运行,jupyter notebook就工作不了了。如何去利用可以重用的模块形式去编写脚本,打包好我们的Spark应用程序,直至最后提交Spark作业用于生产,这是需要我们掌握的一项技能。

    1 spark-submit命令

    # 一般级别的,语法如下:
    spark-submit [options] <python file> [app arguments]
    

    提交作业到Spark的入口点(本地或集群)是spark-submit脚本,该脚本不仅可以提交作业,也可以终止作业或检查其状态。
    spark-submit命令提供了一个统一的API把应用程序部署到各种Spark支持的集群管理器上(如YARN),从而免除了单独配置每个应用程序。

    PySpark命令行参数

    --master:用于设置主节点的URL参数。
    (1)local 用于执行本地机器的代码。
    如果你传递local参数,Spark会运行一个单一的线程(不会利用任何并行线程)。
    在一个多核机器上,local[n]来为Spark指定一个具体使用的内核数,n指的是使用的内核数。
    通过loacl[*]来制定运行和Spark机器内核一样多的复杂线程。
    (2)spark://host:port 这是一个URL和一个Spark单机集群的端口(不运行任何作业调度,如Mesos或者Yarn)
    (3)mesos://host:port 这是一个URL和一个部署在Mesos上的Spark集群端口
    (4)yarn 作为一个负载均衡器,用于从运行Yarn的主节点提交作业。

    --deploy-mode:允许你决定是否在本地(使用client)启动Spark驱动程序的参数,或者在集群内(使用cluster)的任意一台机器上启动。此参数默认值为client。

    --name:应用程序的名字,如果在创建SparkSession时,以编程方式指定应用程序名称,那么来自命令行的参数会被重写。

    --py-files : .py、.egg或者.zip文件的逗号分隔列表,包括Python应用程序。这些文件将被交付给每一个执行器来使用。

    --file :命令给出一个逗号分隔的文件列表,这些文件将被交付到每一个执行器来使用。

    --conf : 参数通过命令行动态地更改应用程序的配置。语法是:

    <Spark property>=<value for the property> 。
    例如:
    --conf spark.local.dir=/opt/Spark2.2.0/ 
    --conf spark.app.name=com.xxx.passengerflow.metro_jh
    

    需要注意的是Spark有3个地方使用配置参数:
    (1)最高优先级的是在SparkContext时,指定了SparkConf的参数获得最高优先权;
    (2)第二优先权:spark-submit传递给的参数;
    (3)第三优先权:conf/spark-default.conf文件中指定的参数。

    --properties-file :配置文件,它应该有和conf/spark-defaults.conf文件相同的属性设置,也是可读的。

    --driver-memory:指定应用程序在驱动程序上分配多少内存的参数。允许的值有一个语法限制,类似于1000M,2G。默认值为1024M。

    --executor-memory:参数指定每个执行器上为应用程序分配多少内存。默认值为1G。

    --help:展示帮助信息和退出

    --verbose:在运行应用程序时打印附加调试信息。

    --version:打印spark版本

    --driver-cores:(在spark单机cluster或者Yarn上部署cluster模式下),允许指定驱动程序的内核数量(默认值为1)

    --kill:将完成的过程赋予submission_id

    --status:如果指定了该命令,它将请求指定的应用程序的状态。

    --total-executor-cores:(在spark单机或Mesos client部署模式下)该参数会为所有执行器(不是每一个)请求指定的内核数量。

    在YARN集群提交时可以指定的:

    --queue:该参数指定了YARN上的队列,以便于将该作业提交到队列(默认值是default)

    --num-executors:指定需要多个执行器来请求该作业的参数。如果启动了动态分配,则执行器的初始数量至少是指定的数量。

    2 以编程方式部署应用程序

    如何创建和配置SparkSession?

    如何对Spark使用外部模块?

    2.1 配置你的SparkSession(或者sc)

    以编程方式使用Jupyter和提交作业的主要区别是,你必须创建Spark context上下文背景环境,而使用Jupyter运行Spark,上下文背景会自动开始。

    2.2 创建SparkSession

    Spark2.2.0

    from pyspark.sql import SparkSession
    spark = SparkSession \
                  .builder \
                  .appName('CalculatingGeoDistances') \
                  .getOrCreate()
    print('Session created')
    如果此时你想创建一个SparkContext,可以直接使用:
    sc = spark.SparkContext
    

    Spark1.6.2

    from pyspark import SparkContext
    sc = SparkContext \
                  .builder \
                  .appName('CalculatingGeoDistances') \
                  .getOrCreate()
    print('sc created')
    

    2.3 模块化代码(重要重要!!!)

    以模块化的形式构建代码,以便于以后重用该代码是一件值得做的事情。
    例子:建立一个模块,并且在数据集上做一些计算,计算出上车和下车位置的直线距离(英里),并且将英里转换为公里。
    (1)模块结构

    image.png image.png

    在Python包的结构中,在顶层有一个setup.py文件,所以可以打包我们的模块。

    setup.py文件内容如下:

    from setuptools import setup
    
    setup(
        name='PySparkUtilities',
        version='0.1dev',
        packages=['utilities', 'utilities/converters'],
        license='''
            Creative Commons 
            Attribution-Noncommercial-Share Alike license''',
        long_description='''
            An example of how to package code for PySpark'''
    )
    

    关于如何定义其他项目的setup.py文件,可以参考:
    https://pythonhosted.org/an_example_pypi_project/setuptools.html

    image.png
    init.py文件内容如下:
    from .geoCalc import geoCalc
    __all__ = ['geoCalc','converters']
    

    (2)计算两点之间的距离
    该代码位于该模块的geoCalc.py文件中

    import math
    
    class geoCalc(object):
        @staticmethod
        def calculateDistance(p1, p2):
            '''
                calculates the distance using Haversine formula
            '''
            R = 3959 # earth's radius in miles
    
            # get the coordinates
            lat1, lon1 = p1[0], p1[1]
            lat2, lon2 = p2[0], p2[1]
    
            # convert to radians
            deltaLat_radians = math.radians(lat2-lat1)
            deltaLon_radians = math.radians(lon2-lon1)
    
            lat1_radians = math.radians(lat1)
            lat2_radians = math.radians(lat2)
    
            # apply the formula
            hav = math.sin(deltaLat_radians / 2.0) * \
                math.sin(deltaLat_radians / 2.0) + \
                math.sin(deltaLon_radians / 2.0) * \
                math.sin(deltaLon_radians / 2.0) * \
                math.cos(lat1_radians) * \
                math.cos(lat2_radians) 
    
            dist = 2 * R * math.asin(math.sqrt(hav)) 
    
            return dist
    
    if __name__ == '__main__':
        p1 = {'address': '301 S Jackson St, Seattle, WA 98104',
        'lat': 47.599200, 
        'long': -122.329841}
    
        p2 = {'address': 'Thunderbird Films Inc 533, Smithe St #401, Vancouver, BC V6B 6H1, Canada',
            'lat': 49.279688, 
            'long': -123.119190}
    
        print(geoCalc.calculateDistance((p1['lat'], p1['long']), (p2['lat'], p2['long'])))
    

    calculateDistance()是geoCalc类的静态方法,它需要两个地理位置,表示为一个元祖或者一个具有两个元素的列表(按照顺序排列的维度和经度),并且使用Haversine公式计算距离(英里)。

    (3)转变距离单位
    为了便于使用,作为converter实现的任何类都应该公开类似的窗口,查看base.py文件内容:

    from abc import ABCMeta, abstractmethod
    
    class BaseConverter(metaclass=ABCMeta):
        @staticmethod
        @abstractmethod
        def convert(f, t):
            raise NotImplementedError
    
    if __name__ == '__main__':
        i = BaseConverter()
    

    distance.py内容:

    from ..base import BaseConverter
    
    class metricImperial(BaseConverter):
        pass
    
        @staticmethod
        def convert(f, t):
            conversionTable = {
                'in': {  
                    'mm': 25.4,    'cm': 2.54,     'm': 0.0254, 
                    'km': 0.0000254
                }, 'ft': {  
                    'mm': 304.8,   'cm': 30.48,    'm': 0.3048,
                    'km': 0.0003048
                }, 'yd': {  
                    'mm': 914.4,   'cm': 91.44,    'm': 0.9144,
                    'km': 0.0009144
                }, 'mile': {    
                    'mm': 1609344, 'cm': 160934.4, 'm': 1609.344,
                    'km': 1.609344
                }   
            }
    
            f_val, f_unit = f.split(' ')
            f_val = float(f_val)
    
            if f_unit in conversionTable.keys():
                if t in conversionTable[f_unit].keys():
                    conv = 1 / conversionTable[f_unit][t]
                else:
                    raise KeyError('Key {0} not found...' \
                        .format(t))
            elif t in conversionTable.keys():
                if f_unit in conversionTable[t].keys():
                    conv = conversionTable[t][f_unit]
                else:
                    raise KeyError('Key {0} not found...' \
                        .format(f_unit))
            else:
                raise KeyError('Neither {0} nor {1} key found'\
                    .format(t, f_unit))
    
            return f_val / conv
    
    if __name__ == '__main__':
        f = metricImperial()
        print(f.convert('10 mile', 'km'))
    

    (4)打包:创建一个egg文件
    PySpark文档指出,可以用逗号来分隔传递.py文件给spark-submit脚本。其实最方便的是把模块打包进一个.zip或者一个.egg。当setup.py文件方便使用时,调用additionalCode文件夹里的内容:

    python setup.py bdist_egg
    

    可以看到3个文件夹:PySparkUtilities.egg-info、build和dist
    dist文件夹有:PySparkUtilities-0.1.dev0-py3.5.egg

    (5)Spark中用户定义函数

    为了对在PySpark中的DataFrame执行操作,有两个选择:使用内置函数来处理数据(大多数情况下都足以达到你的需求,且作为更高性能的代码而被推荐使用);但是有的时候需要自定义函数去实现若干功能选项。

    为了定义一个UDF,必须把Python函数封装在.udf()方法中,并且定义它的返回值类型。以下是我们如何在脚本中实现
    calculatingGeoDistance.py文件

    import utilities.geoCalc as geo
    from utilities.converters import metricImperial
    
    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    import pyspark.sql.functions as func
    
    def geoEncode(spark):
        # read the data in
        uber = spark.read.csv(
            'uber_data_nyc_2016-06_3m_partitioned.csv', 
            header=True, 
            inferSchema=True
            )\
            .repartition(4) \
            # .select('VendorID','tpep_pickup_datetime', 'pickup_longitude', 'pickup_latitude','dropoff_longitude','dropoff_latitude','total_amount')
    
        # prepare the UDFs
        getDistance = func.udf(
            lambda lat1, long1, lat2, long2: 
                geo.calculateDistance(
                    (lat1, long1),
                    (lat2, long2)
                )
            )
    
        convertMiles = func.udf(lambda m: 
            metricImperial.convert(str(m) + ' mile', 'km'))
    
        # create new columns
        uber = uber.withColumn(
            'miles', 
                getDistance(
                    func.col('pickup_latitude'),
                    func.col('pickup_longitude'), 
                    func.col('dropoff_latitude'), 
                    func.col('dropoff_longitude')
                )
            )
    
        uber = uber.withColumn(
            'kilometers', 
            convertMiles(func.col('miles')))
    
        # print 10 rows
        # uber.show(10)
    
        # save to csv (partitioned)
        uber.write.csv(
            'uber_data_nyc_2016-06_new.csv',
            mode='overwrite',
            header=True,
            compression='gzip'
        )
    
    if __name__ == '__main__':
        spark = SparkSession \
            .builder \
            .appName('CalculatingGeoDistances') \
            .getOrCreate()
    
        print('Session created')
    
        try:
            geoEncode(spark)
    
        finally:
            spark.stop()
    

    2.4 提交作业

    可以在命令行键入以下命令:

    ./launch_spark_submit.sh \
    --master local[4] \
    --py-files additionalCode/dist/PySparkUtilities-0.1.dev0-
    py3.5.egg \
    calculatingGeoDistance.py
    

    其中launch_spark_submit.sh是spark-submit命令的封装,通过对jupyter设置了PYSPARK_DRIVER_PYTHON系统变量:

    #!/bin/bash
    unset PYSPARK_DRIVER_PYTHON
    spark-submit $*
    export PYSPARK_DRIVER_PYTHON=jupyter
    

    2.5 实际项目中,spark-submit的设置和打包

    (1)命令行模式

    集群模式:

    读取的文件必须放在HDFS上:

    spark-submit --master yarn \
                 --deploy-mode cluster \
                 --num-executors 25 \
                 --executor-cores 2 \
                 --driver-memory 4g \
                 --executor-memory 4g \
                 --conf spark.broadcast.compress=true \
                 --jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1
    

    (2)单节点模式

    单节点启动PySpark Shell
    (1)pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar"
    
    (2)pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar" \
            --num-executors 25 \
            --executor-cores 2 \
            --driver-memory 4g \
            --executor-memory 8g
    
    单节点提交任务
    spark-submit --jars "/data/spark/ojdbc6-11.2.0.3.jar" /pyspark_app/test.py
    
    ./bin/spark-submit --master lcoal[*] /home/ydzhao/pyspark_app/com.meihuichina.passengerflow.grid1km_laccell_grid100m.py
    

    (3)编辑打包成test.sh Shell脚本

    #!/usr/bin/env bash
    
    spark-submit --master yarn \
                 --deploy-mode cluster \
                 --num-executors 25 \
                 --executor-cores 2 \
                 --driver-memory 4g \
                 --executor-memory 4g \
                 --conf spark.broadcast.compress=true \
                 --conf spark.yarn.executor.memoryOverhead=900 \
                 --conf spark.sql.shuffle.partitions=20 \
                 --jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1
    # ./test.sh
    
    # 给test.sh相关权限
    chmod u+x test.sh
    

    运行test.sh

    ./test.sh

    2.6 监控执行

    image.png
    image.png

    相关文章

      网友评论

        本文标题:11 打包Spark应用程序

        本文链接:https://www.haomeiwen.com/subject/lgctaxtx.html