PySpark

作者: 奇而思 | 来源:发表于2020-01-16 17:10 被阅读0次

    在PySpark中使用现有列创建新列

    假如现在有如下DataFrame:


    image.png

    创建新列,使其变成这样


    image.png
    做法:
    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    
    sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
    data = [('x1-y1', 3,'z1'),
            ('x2-y2', 2,'z2'),
            ('x3-y3', 1,'z3')]
    test_df = sqlContext.createDataFrame(data, schema=['_1', '_2', '_3'])
    
    test_df = test_df.withColumn('_4', F.regexp_replace('_1', '-', ''))
    test_df = test_df.withColumn('_5', F.concat(F.regexp_replace('_1', '-', '='),F.lit('='),F.col('_3')))
    test_df.show()
    

    这里使用了pyspark.sql.functions中的concat, regexp_replace, lit, col等函数

    StringIndexer对多列进行使用

    目前只能通过pipeline的形式

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer
    
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['date'])) ]
    
    
    pipeline = Pipeline(stages=indexers)
    df_r = pipeline.fit(df).transform(df)
    
    df_r.show()
    

    相关文章

      网友评论

          本文标题:PySpark

          本文链接:https://www.haomeiwen.com/subject/fnnfzctx.html