美文网首页
spark配置与dataframe常用操作

spark配置与dataframe常用操作

作者: 吵吵人 | 来源:发表于2023-02-13 19:58 被阅读0次
    from pyspark.sql import SparkSession
    from pyspark.sql import HiveContext
    from pyspark.sql import functions as F
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *
    from pyspark.sql.window import Window
    from pyspark import StorageLevel
    import uuid
    import pandas as pd
    
    random_name = str(uuid.uuid1()).replace('-', '_')
    spark_app_name = 'just_a _test' + random_name
    
    spark = SparkSession.builder \
        .config('spark.executor.memory', '10g') \
        .config('spark.executor.cores', '6') \
        .config('spark.executor.instances', '70') \
        .config('spark.driver.memory', '8g') \
        .config('spark.driver.maxResultSize', '10g') \
        .config('spark.rpc.message.maxSize', '256') \
        .config('spark.default.parallelism', '2000') \
        .config("spark.dynamicAllocation.enabled", "true") \
        .config("spark.dynamicAllocation.executorIdleTimeout", "300s") \
        .config('spark.driver.extraJavaOptions', '-Xss10m') \
        .config('spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation', 'true') \
        .config('spark.sql.hive.convertMetastoreOrc', 'true') \
        .config('spark.sql.crossJoin.enabled', 'true') \
        .config('spark.dynamicAllocation.maxExecutors', '300') \
        .appName(spark_app_name) \
        .enableHiveSupport().getOrCreate()
    
    # spark.executor.instances:执行任务使用多少个 executor 实例
    # spark.driver.maxResultSize : 过大可能会造成内存溢出
    # spark.rpc.message.maxSize : 节点间通信的最大值
    # spark.default.parallelism :task数量(多少个线程)
    # spark.sql.sources.default :默认数据源
    # spark.dynamicAllocation.enabled:动态资源配置
    # spark.driver.egtraJavaOptions :配置driver的jvm参数,垃圾回收,gss10m表示为虚拟机开辟了大小为10m大小的空间
    # spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation:解决写入Hive表时,表已经删除但依然无法写入问题
    # spark.sql.hive.convertMetastoreOrc:解决hive和spark元数据混乱无法读表的问题
    
    sc = spark.sparkContext
    hiveCtx = HiveContext(sc)
    print('spark.version:', spark.version)   # 2.4.4
    print('application id:', sc.applicationId)  # application_1627462564975_68133043
    
    # 读取数据
    pt = '20221011'
    df = spark.sql(f"""   
        select
          can2_id,
          city_name,
          cast(lng as bigint) as lng,
          cast(lat as bigint) as lat
        from
          XXXX
        where
          pt = '{pt}'  
        limit 10
     """).cache() # cache和persist类似,persist可选择存储等级;persist() + show()不能缓存全部数据
    # .persist(StorageLevel.MEMORY_ONLY_SER) ,MEMORY_ONLY_SER序列化之后再在内存存储.对应释放 unpersist() 
    df.show()
    
    # 基本action操作
    df_collect = df.collect() # list
    for it in df_collect:
        print(it['can2_id'])
        
    df.count() # int: 10
    df.describe() # pyspark.sql.dataframe.DataFrame:  DataFrame[summary: string, can2_id: string, city_guid: string, city_name: string, can2_address: string, lng: string, lat: string]
    df.describe('lat').show() 
    # +-------+--------------------+---------+-----------------+-----------------+
    # |summary|             can2_id|city_name|              lng|              lat|
    # +-------+--------------------+---------+-----------------+-----------------+
    # |  count|                  10|       10|               10|               10|
    # |   mean|1.275945804931907...|     null|            115.5|             29.8|
    # | stddev|9.647139018018636E19|     null|4.006938426723771|4.211096452627668|
    # |    min|20221011111669149...|   上海市|              109|               22|
    # |    max|20221011876173328715|   邵阳市|              121|               38|
    # +-------+--------------------+---------+-----------------+-----------------+
    
    df.first() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)
    df.head() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)
    
    # 基本描述
    df.columns  # list : ['can2_id', 'city_name', 'lng', 'lat']
    df.dtypes # list: [('can2_id', 'string'),('city_name', 'string'),('lng', 'bigint'),('lat', 'bigint')],describe返回的 lng 是 string !!?
    df.printSchema() # lng 变成long了!!?
    df.describe()
    # root
    #  |-- can2_id: string (nullable = true)
    #  |-- city_name: string (nullable = true)
    #  |-- lng: long (nullable = true)
    #  |-- lat: long (nullable = true)
    
    df.explain('true') # 执行计划流程,调优用,和UI界面的流程图类似
    df.createOrReplaceGlobalTempView('tmp_table_view')   # registerTempTable 2.0已弃用,替用版本
    
    #创建
    # 一行一行
    schema = StructType([StructField('id',LongType(),True),
                        StructField('name',StringType(),True),
                        StructField('age',IntegerType(),True)])
    rdd = sc.parallelize([(1,'Karal',19),(2,'Bob',18)])
    data = rdd.toDF(['id','name','age'])
    # 
    # +---+-----+---+
    # | id| name|age|
    # +---+-----+---+
    # |  1|Karal| 19|
    # |  2|  Bob| 18|
    # +---+-----+---+
    data = spark.createDataFrame(rdd, schema = schema)
    
    # +---+-----+---+
    # | id| name|age|
    # +---+-----+---+
    # |  1|Karal| 19|
    # |  2|  Bob| 18|
    # +---+-----+---+
    
    # 一列一列
    ids = [1,2,3]
    grades = [3,3,1]
    name = ['Karal','Bob','Lily']
    age = [19,18,15]
    data = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
    data.show()
    
    # 一列一列
    ids = [5,2,2]
    grades = [2,3,3]
    name = ['Maria','Bob','Bob']
    age = [17,18,18]
    data2 = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
    data2.show()
    
    # 集成查询
    data.groupBy('grade').agg(F.max('age').alias('max_age'),F.max('id').alias('max_id')).show()
    
    # 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
    @udf(returnType = IntegerType())
    def udf_max(l1):
        return max(l1)
    
    
    # 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
    # UDF的定义:1.@udf的方式spark外不能单独使用 2. F.udf()的方式要用的时候需要
    @udf(returnType = IntegerType())
    def udf_max(l1):
        return max(l1)
    # udf_max_new = F.udf(udf_max, IntegerType())
    
    data.groupBy('grade').agg(F.collect_list('age').alias('list_info')).withColumn('max_age',udf_max('list_info'))
    
    # sql中的UDF函数
    def my_add(a):
        return a+1
    spark.udf.register('udf_add',my_add,IntegerType())
    
    data.createOrReplaceTempView('data_view')
    sqldf = spark.sql("""
        select 
        *,
        udf_add(age) as age_new
        from data_view
    """)
    sqldf.show()
    # +---+-----+---+-----+-------+
    # | id| name|age|grade|age_new|
    # +---+-----+---+-----+-------+
    # |  1|Karal| 19|    3|     20|
    # |  2|  Bob| 18|    3|     19|
    # |  3| Lily| 15|    1|     16|
    # +---+-----+---+-----+-------+
    
    
    data.withColumn('gender',F.lit('female')).withColumn('grade_new',F.col('grade')).withColumnRenamed('grade','grade_old').show()
    # +---+-----+---+---------+------+---------+
    # | id| name|age|grade_old|gender|grade_new|
    # +---+-----+---+---------+------+---------+
    # |  1|Karal| 19|        3|female|        3|
    # |  2|  Bob| 18|        3|female|        3|
    # |  3| Lily| 15|        1|female|        1|
    # +---+-----+---+---------+------+---------+
    
    data.join(data2,on='id',how='left').show()
    # +---+-----+---+-----+----+----+-----+
    # | id| name|age|grade|name| age|grade|
    # +---+-----+---+-----+----+----+-----+
    # |  1|Karal| 19|    3|null|null| null|
    # |  3| Lily| 15|    1|null|null| null|
    # |  2|  Bob| 18|    3| Bob|  18|    3|
    # |  2|  Bob| 18|    3| Bob|  18|    3|
    # +---+-----+---+-----+----+----+-----+
    
    data.union(data2).show()
    # +---+-----+---+-----+
    # | id| name|age|grade|
    # +---+-----+---+-----+
    # |  1|Karal| 19|    3|
    # |  2|  Bob| 18|    3|
    # |  3| Lily| 15|    1|
    # |  5|Maria| 17|    2|
    # |  2|  Bob| 18|    3|
    # |  2|  Bob| 18|    3|
    # +---+-----+---+-----+
    
    
    data.where((F.col('age')>=18) & (F.col('age')<19)).show()
    data.filter("age == 18").show()
    data.withColumn('age_string',F.col('age').cast('string')).show()
    data.drop('grade').show()
    data.withColumn('is_adult',F.when(F.col("age")<18,0).when(F.col("age")==18,1).otherwise(2)).show()
    data.dropDuplicates()
    data.withColumn("rank_age",F.row_number().over(Window.partitionBy('grade').orderBy(F.col("age").desc()))).show()
    
    # 聚合与拆分
    data.withColumn('info_arr',F.array(['id','name','age','grade'])).show()
    data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age')).show()
    # +---+-----+---+-----+-----------------+
    # | id| name|age|grade|         info_arr|
    # +---+-----+---+-----+-----------------+
    # |  1|Karal| 19|    3|[1, Karal, 19, 3]|
    # |  2|  Bob| 18|    3|  [2, Bob, 18, 3]|
    # |  3| Lily| 15|    1| [3, Lily, 15, 1]|
    # +---+-----+---+-----+-----------------+
    # +---+-----+---+-----+-----------+
    # | id| name|age|grade|info_concat|
    # +---+-----+---+-----+-----------+
    # |  1|Karal| 19|    3| 1#Karal#19|
    # |  2|  Bob| 18|    3|   2#Bob#18|
    # |  3| Lily| 15|    1|  3#Lily#15|
    # +---+-----+---+-----+-----------+
    
    data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age'))
    data_concat.withColumn('info_explode_id',F.explode(F.split('info_concat','#'))).show()
    
    # +---+-----+---+-----+-----------+---------------+
    # | id| name|age|grade|info_concat|info_explode_id|
    # +---+-----+---+-----+-----------+---------------+
    # |  1|Karal| 19|    3| 1#Karal#19|              1|
    # |  1|Karal| 19|    3| 1#Karal#19|          Karal|
    # |  1|Karal| 19|    3| 1#Karal#19|             19|
    # |  2|  Bob| 18|    3|   2#Bob#18|              2|
    # |  2|  Bob| 18|    3|   2#Bob#18|            Bob|
    # |  2|  Bob| 18|    3|   2#Bob#18|             18|
    # |  3| Lily| 15|    1|  3#Lily#15|              3|
    # |  3| Lily| 15|    1|  3#Lily#15|           Lily|
    # |  3| Lily| 15|    1|  3#Lily#15|             15|
    # +---+-----+---+-----+-----------+---------------+
    

    相关文章

      网友评论

          本文标题:spark配置与dataframe常用操作

          本文链接:https://www.haomeiwen.com/subject/zbvzcdtx.html