GeoSpark-> PointRDD(Python 版本)

作者: 一个懒散的人 | 来源:发表于2020-12-03 11:22 被阅读0次

    【如果没有加特殊说明,一般都是Scala或者Java版本,如果是Python 版本,会加备注的】

    • 主要功能测试点说明:
      a. 读取PointRDD读取csv文件的数据,形成spatialRDD
      b. 通过Adapter.toDf(spatialRDD,sc) 将RDD转成spatialDataFrame
      c. 过滤符合条件的spatialDataFrame
    1. 测试数据checkin.csv:
    -88.331492,32.324142,hotel
    -88.175933,32.360763,gas
    -88.388954,32.357073,bar
    -88.221102,32.35078,restaurant
    

    2.测试代码(Python):

    import findspark # 如果没有引入这个包,可能会报一些错误,注意
    findspark.init() 
    
    from pyspark import StorageLevel
    from geospark.core.SpatialRDD import PointRDD
    from geospark.core.enums import FileDataSplitter
    from pyspark.sql import SparkSession
    from geospark.register import GeoSparkRegistrator
    from geospark.utils.adapter import Adapter
    
    # 初始化spark
    spark = SparkSession.builder\
        .config("spark.serializer", KryoSerializer.getName)\
        .config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName).\
          getOrCreate()
    
    GeoSparkRegistrator.registerAll(spark)
    
    
    input_location = r"D:\pycharm\pythonProject\GeoSpark\data\checkin.csv"
    offset = 0  # The point long/lat starts from Column 0
    splitter = FileDataSplitter.CSV # FileDataSplitter enumeration
    carry_other_attributes = True  # Carry Column 2 (hotel, gas, bar...)
    level = StorageLevel.MEMORY_ONLY # Storage level from pyspark
    s_epsg = "epsg:4326" # Source epsg code
    t_epsg = "epsg:5070" # target epsg code
    
    
    point_rdd = PointRDD(spark.sparkContext, input_location, offset, splitter, carry_other_attributes)
    
    point_rdd = PointRDD(spark.sparkContext, input_location, splitter, carry_other_attributes, level, s_epsg, t_epsg)
    
    point_rdd = PointRDD(
        sparkContext=spark.sparkContext,
        InputLocation=input_location,
        Offset=offset,
        splitter=splitter,
        carryInputData=carry_other_attributes
    )
    
    df = Adapter.toDf(spatialRDD=point_rdd,sparkSession=spark)
    df.show(truncate=False)
    df.createOrReplaceTempView("p_view")
    rslt = spark.sql(""" select * from p_view where p_view._c1='hotel' """.strip())
    rslt.show(truncate=False)
    
    1. 测试结果:
    +----------------------------+----------+
    |geometry                    |_c1       |
    +----------------------------+----------+
    |POINT (-88.331492 32.324142)|hotel     |
    |POINT (-88.175933 32.360763)|gas       |
    |POINT (-88.388954 32.357073)|bar       |
    |POINT (-88.221102 32.35078) |restaurant|
    +----------------------------+----------+
    
    +----------------------------+-----+
    |geometry                    |_c1  |
    +----------------------------+-----+
    |POINT (-88.331492 32.324142)|hotel|
    +----------------------------+-----+
    
    
    Process finished with exit code 0
    

    有问题随时联系。

    相关文章

      网友评论

        本文标题:GeoSpark-> PointRDD(Python 版本)

        本文链接:https://www.haomeiwen.com/subject/nxhmwktx.html