美文网首页
pyspark udf

pyspark udf

作者: hehehehe | 来源:发表于2022-01-17 17:37 被阅读0次

udf lambda

df4 = df3.withColumn("hn_id3", udf(lambda x: x[0],StringType())(df3.ids))

group_df = df1.groupBy(['house_num_normal']).agg(fn.collect_list('lon').alias('lons'),
                                                     fn.collect_list('lat').alias('lats'),
                                                     fn.collect_list('id2')[0].alias('id'))
group_df.withColumn('centroid', get_centroid(group_df.lons, group_df.lats))

@udf(returnType=StringType())
def get_wkt(lons: list, lats: list):
    points = []
    for lon, lat in zip(lons, lats):
        points.append(Point(float(lon), float(lat)))
    return MultiPoint(points).wkt


@udf(returnType=ArrayType(StringType()))
def lonlat_split(address: str):
    return address[:-1].split(",")


@udf(returnType=ArrayType(FloatType()))
def lonlat_to_list(lon: float, lat: float):
    return [lon, lat]


@udf(returnType=StringType())
def points_hull(points):
    return MultiPoint(points).convex_hull.wkt


@udf(returnType=StringType())
def polygon_centroid(polygon_wkt):
    return wkt.loads(polygon_wkt).centroid.wkt

@udf(returnType=StringType())
def get_centroid(lons: list, lats: list):
    points = []
    for lon, lat in zip(lons, lats):
        points.append(Point(float(lon), float(lat)))
    return MultiPoint(points).centroid.wkt


@udf(returnType=FloatType())
def get_distance(confidence_list: list, lons: list, lats: list):
    if 5 in confidence_list:
        main_idx = confidence_list.index(5)
        main_lon, main_lat = lons[main_idx], lats[main_idx]
        max_dist = 0
        for i, confidence in enumerate(confidence_list):
            if i != main_idx:
                dist = distance.geodesic((float(main_lat), float(main_lon), 0),
                                         (float(lats[i]), float(lons[i]), 0)).meters
                if dist > max_dist:
                    max_dist = dist
        return max_dist

    return -1.0

df2 = spark.sql("""
        select group_key,concat_ws(",",hn_id_list) as hn_id_str,concat_ws(",",confidence_list) as confidence_str,confidence_list,
        lon_list, lat_list 
        from hn_table3
        where hn_id_len > 1
    """)

相关文章

  • PySpark(用户定义函数)

    来源:https://sparkbyexamples.com/pyspark/pyspark-udf-user-d...

  • pyspark udf

    udf lambda

  • Introducing Pandas UDF for PySpa

    Introducing Pandas UDF for PySpark 更新:此博客于 2018 年 2 月 22 ...

  • PySpark pandas udf

    配置 所有运行节点安装 pyarrow ,需要 >= 0.8 为什么会有 pandas UDF 在过去的几年中,p...

  • pyspark pandas udf

    下面介绍pandas_udf的三种函数使用 Scalar UDFs函数 通常做一些简单计算使用该函数,输出长度与输...

  • PySpark UDF 简要教程

    最简单的注册UDF ---- 直接将lambda表达式注册成UDF下面是一个简单的清洗函数 结果 很多时候逻辑比较...

  • pyspark小技巧

    1. pyspark添加列,并向udf中传递多个参数 场景:现在有个keyword的list,需要对输入的每行数据...

  • pyspark: sql.functions以及udf函数

    大纲 选取列 select 常数列 lit 条件分支 when otherwise 数学函数 时间函数 窗口函数 ...

  • Spark Python API Docs(part one)

    pyspark package subpackages pyspark.sql module pyspark.st...

  • pyspark整理

    pyspark入门资料 公众号回复:pyspark (会有pyspark资料大礼包:Learning PySpar...

网友评论

      本文标题:pyspark udf

      本文链接:https://www.haomeiwen.com/subject/ysuchrtx.html