美文网首页
spark配置与dataframe常用操作

spark配置与dataframe常用操作

作者: 吵吵人 | 来源:发表于2023-02-13 19:58 被阅读0次
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import StorageLevel
import uuid
import pandas as pd

random_name = str(uuid.uuid1()).replace('-', '_')
spark_app_name = 'just_a _test' + random_name

spark = SparkSession.builder \
    .config('spark.executor.memory', '10g') \
    .config('spark.executor.cores', '6') \
    .config('spark.executor.instances', '70') \
    .config('spark.driver.memory', '8g') \
    .config('spark.driver.maxResultSize', '10g') \
    .config('spark.rpc.message.maxSize', '256') \
    .config('spark.default.parallelism', '2000') \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s") \
    .config('spark.driver.extraJavaOptions', '-Xss10m') \
    .config('spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation', 'true') \
    .config('spark.sql.hive.convertMetastoreOrc', 'true') \
    .config('spark.sql.crossJoin.enabled', 'true') \
    .config('spark.dynamicAllocation.maxExecutors', '300') \
    .appName(spark_app_name) \
    .enableHiveSupport().getOrCreate()

# spark.executor.instances:执行任务使用多少个 executor 实例
# spark.driver.maxResultSize : 过大可能会造成内存溢出
# spark.rpc.message.maxSize : 节点间通信的最大值
# spark.default.parallelism :task数量(多少个线程)
# spark.sql.sources.default :默认数据源
# spark.dynamicAllocation.enabled:动态资源配置
# spark.driver.egtraJavaOptions :配置driver的jvm参数,垃圾回收,gss10m表示为虚拟机开辟了大小为10m大小的空间
# spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation:解决写入Hive表时,表已经删除但依然无法写入问题
# spark.sql.hive.convertMetastoreOrc:解决hive和spark元数据混乱无法读表的问题

sc = spark.sparkContext
hiveCtx = HiveContext(sc)
print('spark.version:', spark.version)   # 2.4.4
print('application id:', sc.applicationId)  # application_1627462564975_68133043
# 读取数据
pt = '20221011'
df = spark.sql(f"""   
    select
      can2_id,
      city_name,
      cast(lng as bigint) as lng,
      cast(lat as bigint) as lat
    from
      XXXX
    where
      pt = '{pt}'  
    limit 10
 """).cache() # cache和persist类似,persist可选择存储等级;persist() + show()不能缓存全部数据
# .persist(StorageLevel.MEMORY_ONLY_SER) ,MEMORY_ONLY_SER序列化之后再在内存存储.对应释放 unpersist() 
df.show()
# 基本action操作
df_collect = df.collect() # list
for it in df_collect:
    print(it['can2_id'])
    
df.count() # int: 10
df.describe() # pyspark.sql.dataframe.DataFrame:  DataFrame[summary: string, can2_id: string, city_guid: string, city_name: string, can2_address: string, lng: string, lat: string]
df.describe('lat').show() 
# +-------+--------------------+---------+-----------------+-----------------+
# |summary|             can2_id|city_name|              lng|              lat|
# +-------+--------------------+---------+-----------------+-----------------+
# |  count|                  10|       10|               10|               10|
# |   mean|1.275945804931907...|     null|            115.5|             29.8|
# | stddev|9.647139018018636E19|     null|4.006938426723771|4.211096452627668|
# |    min|20221011111669149...|   上海市|              109|               22|
# |    max|20221011876173328715|   邵阳市|              121|               38|
# +-------+--------------------+---------+-----------------+-----------------+

df.first() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)
df.head() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)

# 基本描述
df.columns  # list : ['can2_id', 'city_name', 'lng', 'lat']
df.dtypes # list: [('can2_id', 'string'),('city_name', 'string'),('lng', 'bigint'),('lat', 'bigint')],describe返回的 lng 是 string !!?
df.printSchema() # lng 变成long了!!?
df.describe()
# root
#  |-- can2_id: string (nullable = true)
#  |-- city_name: string (nullable = true)
#  |-- lng: long (nullable = true)
#  |-- lat: long (nullable = true)

df.explain('true') # 执行计划流程,调优用,和UI界面的流程图类似
df.createOrReplaceGlobalTempView('tmp_table_view')   # registerTempTable 2.0已弃用,替用版本

#创建
# 一行一行
schema = StructType([StructField('id',LongType(),True),
                    StructField('name',StringType(),True),
                    StructField('age',IntegerType(),True)])
rdd = sc.parallelize([(1,'Karal',19),(2,'Bob',18)])
data = rdd.toDF(['id','name','age'])
# 
# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# |  1|Karal| 19|
# |  2|  Bob| 18|
# +---+-----+---+
data = spark.createDataFrame(rdd, schema = schema)

# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# |  1|Karal| 19|
# |  2|  Bob| 18|
# +---+-----+---+

# 一列一列
ids = [1,2,3]
grades = [3,3,1]
name = ['Karal','Bob','Lily']
age = [19,18,15]
data = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
data.show()

# 一列一列
ids = [5,2,2]
grades = [2,3,3]
name = ['Maria','Bob','Bob']
age = [17,18,18]
data2 = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
data2.show()

# 集成查询
data.groupBy('grade').agg(F.max('age').alias('max_age'),F.max('id').alias('max_id')).show()

# 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
@udf(returnType = IntegerType())
def udf_max(l1):
    return max(l1)


# 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
# UDF的定义:1.@udf的方式spark外不能单独使用 2. F.udf()的方式要用的时候需要
@udf(returnType = IntegerType())
def udf_max(l1):
    return max(l1)
# udf_max_new = F.udf(udf_max, IntegerType())

data.groupBy('grade').agg(F.collect_list('age').alias('list_info')).withColumn('max_age',udf_max('list_info'))

# sql中的UDF函数
def my_add(a):
    return a+1
spark.udf.register('udf_add',my_add,IntegerType())

data.createOrReplaceTempView('data_view')
sqldf = spark.sql("""
    select 
    *,
    udf_add(age) as age_new
    from data_view
""")
sqldf.show()
# +---+-----+---+-----+-------+
# | id| name|age|grade|age_new|
# +---+-----+---+-----+-------+
# |  1|Karal| 19|    3|     20|
# |  2|  Bob| 18|    3|     19|
# |  3| Lily| 15|    1|     16|
# +---+-----+---+-----+-------+


data.withColumn('gender',F.lit('female')).withColumn('grade_new',F.col('grade')).withColumnRenamed('grade','grade_old').show()
# +---+-----+---+---------+------+---------+
# | id| name|age|grade_old|gender|grade_new|
# +---+-----+---+---------+------+---------+
# |  1|Karal| 19|        3|female|        3|
# |  2|  Bob| 18|        3|female|        3|
# |  3| Lily| 15|        1|female|        1|
# +---+-----+---+---------+------+---------+

data.join(data2,on='id',how='left').show()
# +---+-----+---+-----+----+----+-----+
# | id| name|age|grade|name| age|grade|
# +---+-----+---+-----+----+----+-----+
# |  1|Karal| 19|    3|null|null| null|
# |  3| Lily| 15|    1|null|null| null|
# |  2|  Bob| 18|    3| Bob|  18|    3|
# |  2|  Bob| 18|    3| Bob|  18|    3|
# +---+-----+---+-----+----+----+-----+

data.union(data2).show()
# +---+-----+---+-----+
# | id| name|age|grade|
# +---+-----+---+-----+
# |  1|Karal| 19|    3|
# |  2|  Bob| 18|    3|
# |  3| Lily| 15|    1|
# |  5|Maria| 17|    2|
# |  2|  Bob| 18|    3|
# |  2|  Bob| 18|    3|
# +---+-----+---+-----+


data.where((F.col('age')>=18) & (F.col('age')<19)).show()
data.filter("age == 18").show()
data.withColumn('age_string',F.col('age').cast('string')).show()
data.drop('grade').show()
data.withColumn('is_adult',F.when(F.col("age")<18,0).when(F.col("age")==18,1).otherwise(2)).show()
data.dropDuplicates()
data.withColumn("rank_age",F.row_number().over(Window.partitionBy('grade').orderBy(F.col("age").desc()))).show()

# 聚合与拆分
data.withColumn('info_arr',F.array(['id','name','age','grade'])).show()
data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age')).show()
# +---+-----+---+-----+-----------------+
# | id| name|age|grade|         info_arr|
# +---+-----+---+-----+-----------------+
# |  1|Karal| 19|    3|[1, Karal, 19, 3]|
# |  2|  Bob| 18|    3|  [2, Bob, 18, 3]|
# |  3| Lily| 15|    1| [3, Lily, 15, 1]|
# +---+-----+---+-----+-----------------+
# +---+-----+---+-----+-----------+
# | id| name|age|grade|info_concat|
# +---+-----+---+-----+-----------+
# |  1|Karal| 19|    3| 1#Karal#19|
# |  2|  Bob| 18|    3|   2#Bob#18|
# |  3| Lily| 15|    1|  3#Lily#15|
# +---+-----+---+-----+-----------+

data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age'))
data_concat.withColumn('info_explode_id',F.explode(F.split('info_concat','#'))).show()

# +---+-----+---+-----+-----------+---------------+
# | id| name|age|grade|info_concat|info_explode_id|
# +---+-----+---+-----+-----------+---------------+
# |  1|Karal| 19|    3| 1#Karal#19|              1|
# |  1|Karal| 19|    3| 1#Karal#19|          Karal|
# |  1|Karal| 19|    3| 1#Karal#19|             19|
# |  2|  Bob| 18|    3|   2#Bob#18|              2|
# |  2|  Bob| 18|    3|   2#Bob#18|            Bob|
# |  2|  Bob| 18|    3|   2#Bob#18|             18|
# |  3| Lily| 15|    1|  3#Lily#15|              3|
# |  3| Lily| 15|    1|  3#Lily#15|           Lily|
# |  3| Lily| 15|    1|  3#Lily#15|             15|
# +---+-----+---+-----+-----------+---------------+

相关文章

网友评论

      本文标题:spark配置与dataframe常用操作

      本文链接:https://www.haomeiwen.com/subject/zbvzcdtx.html