import geopandas as gpd
import numpy as np
from scipy.spatial import cKDTree
from shapely.geometry import MultiPolygon, Polygon
def find_near_blocks(tree, target_small_block, kdtree_search_radius):
target_centroid = (target_small_block.centroid.x, target_small_block.centroid.y)
nearest_indices = tree.query_ball_point(target_centroid, kdtree_search_radius)
# distance, nearest_idx = tree.query(target_centroid)
return nearest_indices
def cal_compactness(polygon):
perimeter = polygon.length
area = polygon.area
compactness = 4 * 3.141592653 * area / (perimeter ** 2) # 4 * pi * S /(L^2)
return compactness
def eliminate_small_blocks(gdf, small_cut_th=9000000, kdtree_search_radius=5000):
"""
:param gdf: geodataframe
:param small_cut_th:
:param kdtree_search_radius:
:return: geodataframe类型,小地块消除之后数据
"""
gdf.to_crs("EPSG:32649", inplace=True)
large_blocks = gdf[gdf['geometry'].area >= small_cut_th] # 面积大于等于 9000000 为大地块
small_blocks = gdf[gdf['geometry'].area < small_cut_th] # 面积小于 9000000 为小地块
print('大地块数量,小地块数量', large_blocks.shape[0], small_blocks.shape[0])
tree = cKDTree(np.array(large_blocks['geometry'].apply(lambda geom: (geom.centroid.x, geom.centroid.y)).tolist()))
# 对每个小地块进行融合操作:融合进"融合后"形状最规整的地块
large_blocks_copy = large_blocks.copy() # 必须复制后才能正常修改
eliminate_cnt = 0
for idx, small_block in small_blocks.iterrows():
nearest_indices = find_near_blocks(tree, small_block['geometry'], kdtree_search_radius)
near_blocks = large_blocks.iloc[nearest_indices]
best_inx = None
best_comp = 0
best_merged_geometry = None
for near_block in near_blocks.iloc:
large_block_idx = near_block.name
merged_geometry = large_blocks_copy.loc[large_block_idx, 'geometry'].union(
small_blocks.loc[idx, 'geometry'])
if isinstance(merged_geometry, Polygon):
compactness = cal_compactness(merged_geometry)
if compactness > best_comp:
best_comp = compactness
best_inx = large_block_idx
best_merged_geometry = merged_geometry
if best_inx is not None:
large_blocks_copy.loc[best_inx, 'geometry'] = best_merged_geometry
eliminate_cnt += 1
else:
new_row = small_blocks.loc[idx]
large_blocks_copy.append(new_row, ignore_index=True)
# print('无法消除该小地块', idx)
eliminate_rate = eliminate_cnt / small_blocks.shape[0]
merged_blocks = gpd.GeoDataFrame(large_blocks_copy, crs=gdf.crs)
return merged_blocks, eliminate_rate
gdf = gpd.read_file('./data/消除小地块.shp')
merged_blocks, eliminate_rate = eliminate_small_blocks(gdf, small_cut_th=9000000, kdtree_search_radius=10000)
merged_blocks.to_file('./data/消除小地块_消除后.shp')
print('成功消除小地块比例:', eliminate_rate)
网友评论