https://github.com/spark-examples/pyspark-examples
pivot
df = spark.createDataFrame([('Joe', '70000'), ('Joe', '70000'),( 'Henry', '80000')],
['Name', 'Sallary'])
+-----+-------+
| Name|Sallary|
+-----+-------+
| Joe| 70000|
| Joe| 70000|
|Henry| 80000|
+-----+-------+
df.selectExpr("row_number() over (order by Name) as a","Name","Sallary").show()
+---+-----+-------+
| a| Name|Sallary|
+---+-----+-------+
| 1|Henry| 80000|
| 2| Joe| 70000|
| 3| Joe| 70000|
+---+-----+-------+
df.selectExpr("concat(cast(a as string),'aaa')").show()
from pyspark.sql.functions import first
df.groupBy().pivot("Name").agg(first("Sallary")).show()
joined_group_df = spark.sql("""
select id,collect_set(address)[0] as address,collect_set(address_normal)[0] as address_normal,
concat_ws(",",collect_set(id2)) as id2,
collect_set(address2)[0] as address2,
collect_set(address_normal2)[0] as address_normal2,
concat_ws(",",collect_set(longitude2)) as longitude2,
concat_ws(",",collect_set(latitude2)) as latitude2
from joined_df group by id
""").cache()
dataframe udf
from pyspark.sql.types import StructType, StringType, StructField, IntegerType,ArrayType
@udf(returnType=ArrayType(StringType()))
def lonlat_split(address: str):
return address[:-1].split(",")
@udf(returnType=StringType())
def get_centroid(lons: list, lats: list):
points = []
for lon, lat in zip(lons, lats):
points.append(Point(float(lon), float(lat)))
return MultiPoint(points).centroid.wkt
df2 = df.withColumn("lon",lonlat_split(df.district_pos)[0])
df2 = df2.withColumn("lat",lonlat_split(df.district_pos)[1])
df2.select(['lon','lat']).show(2)
+----------+---------+
| lon| lat|
+----------+---------+
|109.268983|22.687409|
|108.810373|23.217889|
+----------+---------+
only showing top 2 rows
from pyspark.sql import functions as fn
df.groupBy(['_1']).agg(fn.collect_list('_2').alias('list')).show(5)
+---+------+
| _1| list|
+---+------+
| 1|[2, 3]|
| 2| [3]|
+---+------+
zipWithIndex
df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
In [50]: df1 = df0.rdd.zipWithIndex()
In [51]: df1.take(3)
Out[51]: [(Row(col1=1), 0), (Row(col1=2), 1), (Row(col1=3), 2)]
In [56]: df1.map(lambda x:Row(a=x[0]['col1'],id=x[1])).toDF().show()
+---+---+
| a| id|
+---+---+
| 1| 0|
| 2| 1|
| 3| 2|
| 1| 3|
| 2| 4|
| 3| 5|
+---+---+
rdd_zip = rdd.zipWithIndex().map(lambda x: (x[1], *x[0]))
rdd_zip = rdd.zipWithIndex().map(lambda x: {"id": x[1], **x[0].asDict()})
join
In [16]: e1.alias('ee1').join(e2.alias('ee2'),[col('ee1.emp_id') == col('ee2.emp
...: _id')],'left').select(col('ee1.emp_id'),col('ee2.emp_id').alias('id2'))
...: .show()
+------+----+
|emp_id| id2|
+------+----+
| 6|null|
| 5|null|
| 1| 1|
| 3|null|
| 2| 2|
| 4|null|
+------+----+
r1 = sc.parallelize([(1,100),(2,200),(3,300),(4,400)])
r2 = sc.parallelize([(2,(22,0)),(4,(44,1)),(5,(55,0)),(6,(66,1))])
r3 = sc.parallelize([(2,22,0),(4,44,1),(5,55,0),(6,66,1)])
>>> r1.leftOuterJoin(r2).collect()
[(1, (100, None)), (2, (200, (22, 0))), (3, (300, None)), (4, (400, (44, 1)))]
>>> df1 = spark.createDataFrame(r1,['id','amount'])
>>> df1.show()
+---+------+
| id|amount|
+---+------+
| 1| 100|
| 2| 200|
| 3| 300|
| 4| 400|
+---+------+
>>> df3 = spark.createDataFrame(r3,['id','weight','exist'])
>>> df3.show()
+---+------+-----+
| id|weight|exist|
+---+------+-----+
| 2| 22| 0|
| 4| 44| 1|
| 5| 55| 0|
| 6| 66| 1|
+---+------+-----+
SELECT a.* FROM product a LEFT JOIN product_details b
ON a.id=b.id AND b.weight!=44 AND b.exist=0
WHERE b.id IS NULL;
>>> df4 = df1.join(df3,[df1.id == df3.id , df3.weight!=44 , df3.exist==0],'left')
>>> df4.filter('weight is null' ).show()
+---+------+----+------+-----+
| id|amount| id|weight|exist|
+---+------+----+------+-----+
| 1| 100|null| null| null|
| 3| 300|null| null| null|
| 4| 400|null| null| null|
+---+------+----+------+-----+
agg
https://www.cnblogs.com/seekerjunyu/p/14016240.html
1、groupby
from pyspark.sql.functions import first, collect_list, mean
In:
df.groupBy("ID").agg(mean("P"), first("index"),
first("xinf"), first("xup"),
first("yinf"), first("ysup"),
collect_list("M"))
2、dataframe
df.agg(mean('label')).show()
+------------------+
| avg(label)|
+------------------+
|0.7411402157164869|
+------------------+
agglist = [mean(x) for x in tips_.columns]
agglist
Out[109]: [Column<b'avg(total_bill)'>, Column<b'avg(tip)'>, Column<b'avg(size)'>]
tips_.agg(*agglist).show()
+------------------+----------------+-----------------+
| avg(total_bill)| avg(tip)| avg(size)|
+------------------+----------------+-----------------+
|19.785942643392282|2.99827868821191|2.569672131147541|
+------------------+----------------+-----------------+
df_group = df.groupBy('building') \
.agg(fn.collect_list('hn_id').alias('hn_id'), fn.collect_list('building_num').alias('building_nums'))
df_group.foreach(gen_cp)
column
from pyspark.sql.functions import concat_ws,current_date,when,col
@udf(returnType=StringType())
def strQ2B(ustring):
if ustring:
ss = []
for s in ustring:
rstring = ""
for uchar in s:
inside_code = ord(uchar)
if inside_code == 12288: # 全角空格直接转换
inside_code = 32
elif (inside_code >= 65281 and inside_code <= 65374): # 全角字符(除空格)根据关系转化
inside_code -= 65248
rstring += chr(inside_code)
ss.append(rstring)
return "".join(ss)
return ustring
df2 = df2.withColumn("address", strQ2B(col("address")))
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df3 = df.withColumn("salary",col("salary")*100)
df4 = df.withColumn("CopiedColumn",col("salary")* -1)
df5 = df.withColumn("Country", lit("USA"))
df.withColumnRenamed("gender","sex")
df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
df_r=df.withColumn('row_number',sf.row_number().over(Window.partitionBy(df.level).orderBy(df.age)).alias("rowNum"))
df.withColumn("name", concat_ws(",","firstname",'lastname')) .show()
df.withColumn("current_date", current_date()) .show()
df.withColumn("grade", \
when((df.salary < 4000), lit("A")) \
.when((df.salary >= 4000) & (df.salary <= 5000), lit("B")) \
.otherwise(lit("C")) \
).show()
df4 = spark.sql("SELECT STRING(age),BOOLEAN(isGraduated),DATE(jobStartDate) from CastExample")
df.withColumn("salary",df.salary.cast('double')).printSchema()
df.withColumn("salary",df.salary.cast(DoublerType())).printSchema()
df.withColumn("salary",col("salary").cast('double')).printSchema()
groupby
df_group = df.groupBy('building') \
.agg(fn.collect_list('hn_id').alias('hn_id'), fn.collect_list('building_num').alias('building_nums'))
df_group.foreach(gen_cp)
def generate_parent_children(row):
result = []
building, hnid_nums = row[0], row[1]
parent = []
children = []
for hnid_num in hnid_nums:
hnid, num = hnid_num.split('|')
if num == '-1':
parent.append(hnid)
else:
children.append(hnid)
if parent and children:
if len(parent) > 1:
print("+" * 10)
child_str = ",".join(children)
parent_str = ",".join(parent)
for p in parent:
result.append(p, None, child_str)
for c in children:
result.append(c, parent_str, None)
return result
rdd = df.rdd.map(lambda x: (x['building'], x['hn_id'] + '|' + x['building_num'] if x['building_num'] else "-1"))
parent_children_rdd = rdd.groupByKey().flatMap(generate_parent_children)
select
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp, var_pop
from pyspark.sql.functions import col,expr
df.select(df.colRegex("`^.*name*`")).show()
# Using split() function of Column class
split_col = pyspark.sql.functions.split(df['dob'], '-')
df3 = df.select("firstname", "middlename", "lastname", "dob", split_col.getItem(0).alias('year'),
split_col.getItem(1).alias('month'), split_col.getItem(2).alias('day'))
df3.show(truncate=False)
from pyspark.sql.functions import col,expr
data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
spark.createDataFrame(data).toDF("date","increment") \
.select(col("date"),col("increment"), \
expr("add_months(to_date(date,'yyyy-MM-dd'),cast(increment as int))").alias("inc_date")) \
.show()
print("approx_count_distinct: " + \
str(df.select(approx_count_distinct("salary")).collect()[0][0]))
print("avg: " + str(df.select(avg("salary")).collect()[0][0]))
df.select(collect_list("salary")).show(truncate=False)
df.select(collect_set("salary")).show(truncate=False)
df2 = df.select(countDistinct("department", "salary"))
df2.show(truncate=False)
print("Distinct Count of Department & Salary: "+str(df2.collect()[0][0]))
print("count: "+str(df.select(count("salary")).collect()[0]))
df.select(first("salary")).show(truncate=False)
df.select(last("salary")).show(truncate=False)
df.select(kurtosis("salary")).show(truncate=False)
df.select(max("salary")).show(truncate=False)
df.select(min("salary")).show(truncate=False)
df.select(mean("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)
df.select(stddev("salary"), stddev_samp("salary"), \
stddev_pop("salary")).show(truncate=False)
df.select(sum("salary")).show(truncate=False)
df.select(sumDistinct("salary")).show(truncate=False)
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
.show(truncate=False)
array_string
from pyspark.sql.functions import col, concat_ws
+----------------+------------------+------------+
|name |languagesAtSchool |currentState|
+----------------+------------------+------------+
|James,,Smith |[Java, Scala, C++]|CA |
|Michael,Rose, |[Spark, Java, C++]|NJ |
|Robert,,Williams|[CSharp, VB] |NV |
+----------------+------------------+------------+
df2 = df.withColumn("languagesAtSchool",
concat_ws(",",col("languagesAtSchool")))
df2.printSchema()
df2.show(truncate=False)
df.createOrReplaceTempView("ARRAY_STRING")
spark.sql("select name, concat_ws(',',languagesAtSchool) as languagesAtSchool," + \
" currentState from ARRAY_STRING") \
.show(truncate=False)
ArrayType
from pyspark.sql.types import StringType, ArrayType,StructType,StructField
arrayCol = ArrayType(StringType(),False)
data = [
("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]
schema = StructType([
StructField("name",StringType(),True),
StructField("languagesAtSchool",ArrayType(StringType()),True),
StructField("languagesAtWork",ArrayType(StringType()),True),
StructField("currentState", StringType(), True),
StructField("previousState", StringType(), True)
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
from pyspark.sql.functions import explode
df.select(df.name,explode(df.languagesAtSchool)).show()
+----------------+------+
| name| col|
+----------------+------+
| James,,Smith| Java|
| James,,Smith| Scala|
| James,,Smith| C++|
| Michael,Rose,| Spark|
| Michael,Rose,| Java|
| Michael,Rose,| C++|
|Robert,,Williams|CSharp|
|Robert,,Williams| VB|
+----------------+------+
from pyspark.sql.functions import split
df.select(split(df.name,",").alias("nameAsArray")).show()
from pyspark.sql.functions import array
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()
from pyspark.sql.functions import array_contains
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
.alias("array_contains")).show()
broadcast
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James |Smith |USA |CA |
|Michael |Rose |USA |NY |
|Robert |Williams|USA |CA |
|Maria |Jones |USA |FL |
+---------+--------+-------+-----+
def state_convert(code):
return broadcastStates.value[code]
result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)
+---------+--------+-------+----------+
|firstname|lastname|country|state |
+---------+--------+-------+----------+
|James |Smith |USA |California|
|Michael |Rose |USA |New York |
|Robert |Williams|USA |California|
|Maria |Jones |USA |Florida |
+---------+--------+-------+----------+
# Broadcast variable on filter
filteDf= df.where((df['state'].isin(broadcastStates.value)))
column
df.sort(df.fname.asc()).show()
#between
df.filter(df.id.between(100,300)).show()
#contains
df.filter(df.fname.contains("Cruise")).show()
#startswith, endswith()
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()
#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()
#like , rlike
df.select(df.fname,df.lname,df.id) \
.filter(df.fname.like("%om"))
#substr
df.select(df.fname.substr(1,2).alias("substr")).show()
#isin
li=["100","200"]
df.select(df.fname,df.lname,df.id) \
.filter(df.id.isin(li)) \
.show()
schema = StructType([
StructField('name', StructType([
StructField('fname', StringType(), True),
StructField('lname', StringType(), True)])
),
StructField('languages', ArrayType(StringType()),True),
StructField('properties', MapType(StringType(),StringType()),True)
])
+--------------+---------------+-----------------------------+
| name| languages| properties|
+--------------+---------------+-----------------------------+
| [James, Bond]| [Java, C#]|[eye -> brown, hair -> black]|
| [Ann, Varsa]| [.NET, Python]|[eye -> black, hair -> brown]|
|[Tom Cruise, ]|[Python, Scala]| [eye -> grey, hair -> red]|
| [Tom Brand,]| [Perl, Ruby]| [eye -> blue, hair -> black]|
+--------------+---------------+-----------------------------+
#getItem()
df.select(df.languages.getItem(1)).show()
df.select(df.properties.getItem("hair")).show()
#getField from Struct or Map
df.select(df.properties.getField("hair")).show()
df.select(df.name.getField("fname")).show()
#dropFields
#from pyspark.sql.functions import col
#df.withColumn("name1",col("name").dropFields(["fname"])).show()
#withField
#from pyspark.sql.functions import lit
#df.withColumn("name",df.name.withField("fname",lit("AA"))).show()
#from pyspark.sql import Row
#from pyspark.sql.functions import lit
#df = spark.createDataFrame([Row(a=Row(b=1, c=2))])
#df.withColumn('a', df['a'].withField('b', lit(3))).select('a.b').show()
#from pyspark.sql import Row
#from pyspark.sql.functions import col, lit
#df = spark.createDataFrame([
#Row(a=Row(b=1, c=2, d=3, e=Row(f=4, g=5, h=6)))])
#df.withColumn('a', df['a'].dropFields('b')).show()
from pyspark.sql.functions import col,lit,create_map
df = df.withColumn("propertiesMap",create_map(
lit("salary"),col("salary"),
lit("location"),col("location")
)).drop("salary","location")
create dataframe
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import *
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2.printSchema()
dfFromData2 = spark.createDataFrame(data).toDF(*columns)
dfFromData2.printSchema()
rowData = map(lambda x: Row(*x), data)
dfFromData3 = spark.createDataFrame(rowData,columns)
dfFromData3.printSchema()
date current_timestamp
.getOrCreate()
data=[["1"]]
df=spark.createDataFrame(data,["id"])
from pyspark.sql.functions import *
#current_date() & current_timestamp()
df.withColumn("current_date",current_date()) \
.withColumn("current_timestamp",current_timestamp()) \
.show(truncate=False)
#SQL
spark.sql("select current_date(), current_timestamp()") \
.show(truncate=False)
# Date & Timestamp into custom format
df.withColumn("date_format",date_format(current_date(),"MM-dd-yyyy")) \
.withColumn("to_timestamp",to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS")) \
.show(truncate=False)
#SQL
spark.sql("select date_format(current_date(),'MM-dd-yyyy') as date_format ," + \
"to_timestamp(current_timestamp(),'MM-dd-yyyy HH mm ss SSS') as to_timestamp") \
.show(truncate=False)
from pyspark.sql.functions import *
df=spark.createDataFrame([["1"]],["id"])
df.select(current_date().alias("current_date"), \
date_format(current_date(),"yyyy MM dd").alias("yyyy MM dd"), \
date_format(current_timestamp(),"MM/dd/yyyy hh:mm").alias("MM/dd/yyyy"), \
date_format(current_timestamp(),"yyyy MMM dd").alias("yyyy MMMM dd"), \
date_format(current_timestamp(),"yyyy MMMM dd E").alias("yyyy MMMM dd E") \
).show()
#SQL
spark.sql("select current_date() as current_date, "+
"date_format(current_timestamp(),'yyyy MM dd') as yyyy_MM_dd, "+
"date_format(current_timestamp(),'MM/dd/yyyy hh:mm') as MM_dd_yyyy, "+
"date_format(current_timestamp(),'yyyy MMM dd') as yyyy_MMMM_dd, "+
"date_format(current_timestamp(),'yyyy MMMM dd E') as yyyy_MMMM_dd_E").show()
date functions
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()
from pyspark.sql.functions import *
#current_date()
df.select(current_date().alias("current_date")
).show(1)
#date_format()
df.select(col("input"),
date_format(col("input"), "MM-dd-yyyy").alias("date_format")
).show()
#to_date()
df.select(col("input"),
to_date(col("input"), "yyy-MM-dd").alias("to_date")
).show()
#datediff()
df.select(col("input"),
datediff(current_date(),col("input")).alias("datediff")
).show()
#months_between()
df.select(col("input"),
months_between(current_date(),col("input")).alias("months_between")
).show()
#trunc()
df.select(col("input"),
trunc(col("input"),"Month").alias("Month_Trunc"),
trunc(col("input"),"Year").alias("Month_Year"),
trunc(col("input"),"Month").alias("Month_Trunc")
).show()
#add_months() , date_add(), date_sub()
df.select(col("input"),
add_months(col("input"),3).alias("add_months"),
add_months(col("input"),-3).alias("sub_months"),
date_add(col("input"),4).alias("date_add"),
date_sub(col("input"),4).alias("date_sub")
).show()
#
df.select(col("input"),
year(col("input")).alias("year"),
month(col("input")).alias("month"),
next_day(col("input"),"Sunday").alias("next_day"),
weekofyear(col("input")).alias("weekofyear")
).show()
df.select(col("input"),
dayofweek(col("input")).alias("dayofweek"),
dayofmonth(col("input")).alias("dayofmonth"),
dayofyear(col("input")).alias("dayofyear"),
).show()
data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)
#current_timestamp()
df2.select(current_timestamp().alias("current_timestamp")
).show(1,truncate=False)
#to_timestamp()
df2.select(col("input"),
to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp")
).show(truncate=False)
#hour, minute,second
data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","input"])
df3.select(col("input"),
hour(col("input")).alias("hour"),
minute(col("input")).alias("minute"),
second(col("input")).alias("second")
).show(truncate=False)
window func
simpleData = (("James", "Sales", 3000), \
("Michael", "Sales", 4600), \
("Robert", "Sales", 4100), \
("Maria", "Finance", 3000), \
("James", "Sales", 3000), \
("Scott", "Finance", 3300), \
("Jen", "Finance", 3900), \
("Jeff", "Marketing", 3000), \
("Kumar", "Marketing", 2000), \
("Saif", "Sales", 4100) \
)
columns = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data=simpleData, schema=columns)
df.printSchema()
df.show(truncate=False)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number", row_number().over(windowSpec)) \
.show(truncate=False)
from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(windowSpec)) \
.show()
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank", dense_rank().over(windowSpec)) \
.show()
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank", percent_rank().over(windowSpec)) \
.show()
from pyspark.sql.functions import ntile
df.withColumn("ntile", ntile(2).over(windowSpec)) \
.show()
from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist", cume_dist().over(windowSpec)) \
.show()
from pyspark.sql.functions import lag
df.withColumn("lag", lag("salary", 2).over(windowSpec)) \
.show()
from pyspark.sql.functions import lead
df.withColumn("lead", lead("salary", 2).over(windowSpec)) \
.show()
windowSpecAgg = Window.partitionBy("department")
from pyspark.sql.functions import col, avg, sum, min, max, row_number
df.withColumn("row", row_number().over(windowSpec)) \
.withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
.withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
.withColumn("min", min(col("salary")).over(windowSpecAgg)) \
.withColumn("max", max(col("salary")).over(windowSpecAgg)) \
.where(col("row") == 1).select("department", "avg", "sum", "min", "max") \
.show()
网友评论