美文网首页
pyspark案例系列7-通过dataframe的pivot实现

pyspark案例系列7-通过dataframe的pivot实现

作者: 只是甲 | 来源:发表于2022-06-14 08:58 被阅读0次

    一. 需求

    今天一个朋友咨询我,spark是否可以实现自动的行转列的功能。

    例如数据格式如下:


    image.png

    需要形成一个如下的矩阵:


    image.png

    二. 解决方案

    我们知道关系型数据库里面有一个pivot可以比较方便的实现行转列,翻看了hive、Spark SQL的官网文档,没有找到pivot函数。

    但是dataframe居然支持,真的是厉害了。

    数据准备:
    hive端数据准备

    use test;
    drop table if exists test1;
    create table test1(user_id varchar(50),prod_id int);
    insert into test1 values ('A',50);
    insert into test1 values ('A',80);
    insert into test1 values ('A',100);
    insert into test1 values ('A',200);
    insert into test1 values ('A',500);
    insert into test1 values ('B',100);
    insert into test1 values ('B',200);
    insert into test1 values ('B',120);
    insert into test1 values ('B',300);
    insert into test1 values ('C',120);
    insert into test1 values ('C',110);
    insert into test1 values ('C',90);
    insert into test1 values ('C',5);
    insert into test1 values ('D',5);
    insert into test1 values ('D',10);
    insert into test1 values ('D',8);
    insert into test1 values ('D',20);
    

    代码:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import concat_ws
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    sql1 = "select t1.user_id user_id1," \
           " t2.user_id user_id2, " \
           "t1.prod_id " \
      "from test.test1 t1 " \
     "left join test.test1 t2 " \
     "on t1.prod_id = t2.prod_id " \
     "order by t2.user_id" 
    
    
    
    df1 = spark.sql(sql1)
    df2 = df1.where('user_id1 != user_id2')
    
    
    df3 = df2.groupBy("user_id1") \
             .pivot("user_id2") \
             .agg({'prod_id':'count'}) \
             .orderBy(df2.user_id1.asc()) \
             .na.fill(0) 
    
    # write csv
    # df3.coalesce(1).write.format("csv").options(header='true', inferschema='true').save("hdfs://hp1:8020/user/juzhen")
    
    # write text
    df4 = df3.select(concat_ws(',',*df3.columns).alias('data'))
    df4.coalesce(1).write.format("text").save("hdfs://hp1:8020/user/juzhen")
    

    测试记录:

    image.png

    相关文章

      网友评论

          本文标题:pyspark案例系列7-通过dataframe的pivot实现

          本文链接:https://www.haomeiwen.com/subject/cgnslltx.html