file_util

作者: hehehehe | 来源:发表于2024-05-25 09:21 被阅读0次
    import json
    import os
    import shutil
    from pathlib import Path
    
    import geojson
    import pandas as pd
    from geojson import FeatureCollection
    from shapely.geometry import shape
    import geopandas as gpd
    
    encodings = ['gbk', 'utf-8']
    
    
    def read_csv(file_path):
        for encoding in encodings:
            try:
                return pd.read_csv(file_path, encoding=encoding)
            except Exception as e:
                print(f"读取文件时发生编码错误:{e}, 尝试使用下一个编码")
    
    
    def json2file(file, jsonObj):
        if jsonObj:
            Path(file).write_text(json.dumps(jsonObj, ensure_ascii=False), encoding='utf-8')
    
    
    def lines2file(file, lines):
        with open(file, 'w') as f:
            f.writelines(lines)
    
    
    def geojson2file(file, jsonObj):
        Path(file).write_text(jsonObj, encoding='utf-8')
    
    
    def features2file(file, features):
        if features:
            fc = geojson.FeatureCollection(features)
            Path(file).write_text(geojson.dumps(fc), encoding='utf-8')
    
    
    def feautres2shp(out_file, features):
        if features:
            gpd_datas = []
            for feature in features:
                properties = feature['properties']
                gpd_data = {}
                for k, v in properties.items():
                    if type(v) == list:
                        gpd_data[k] = json.dumps(v)
                    else:
                        gpd_data[k] = v
                geometry = feature.get('geometry', None)
                if geometry:
                    geometry = shape(geometry)
                gpd_data['geometry'] = geometry
                gpd_datas.append(gpd_data)
            gdf = gpd.GeoDataFrame(gpd_datas)
            gdf.to_file(out_file)
    
    
    def features2shp(file, features):
        if features:
            fc = geojson.FeatureCollection(features)
            Path(file).write_text(geojson.dumps(fc), encoding='utf-8')
    
    
    def make_dir(path):
        if not Path(path).exists():
            Path(path).mkdir(parents=True, exist_ok=True)
    
    
    def file2json(file):
        if Path(file).exists():
            text = Path(file).read_text(encoding='utf-8')
            return json.loads(text)
    
    
    def write(file, content):
        with open(file, "wb") as file:
            file.write(content)
    
    
    def file2geojson(file):
        if Path(file).exists():
            text = Path(file).read_text()
            return geojson.loads(text)
    
    
    def file2features(file):
        if Path(file).exists():
            geojson_fc = json.loads(Path(file).read_text())
            return geojson_fc['features']
    
    
    def merge_json(path, out_file):
        files = Path(path).rglob("*.json")
        all_data = []
        for f in files:
            json_data = file2json(f)
            all_data.extend(json_data)
        json2file(out_file, all_data)
    
    
    def merge_geojson(path, pattern, out_file):
        files = Path(path).rglob(pattern)
        if files:
            all_features = []
            for f in files:
                feature_collection = geojson.loads(Path(f).read_text())
                all_features.extend(feature_collection['features'])
            if all_features:
                fc = FeatureCollection(all_features)
                geojson2file(out_file, geojson.dumps(fc))
    
    
    def geojson2check_fmt(file, out_file):
        file_geojson = file2geojson(file)
        if file_geojson:
            result = []
            for feature in file_geojson['features']:
                geom = feature['geometry']
                coord = geom.coordinates
                properties = feature['properties']
                properties.update({"x": coord[0], 'y': coord[1], 'z': coord[2]})
                result.append(properties)
            if result:
                json2file(out_file, result)
                outfile_p = Path(out_file)
                pd.DataFrame(result).to_csv(Path(outfile_p).parent.joinpath(f"{outfile_p.stem}.csv"), index=False)
    
    
    def tiles_cut(tiles, size):
        return [tiles[i:i + size] for i in range(0, len(tiles), size)]
    
    
    def get_file_size(file) -> float:
        return Path(file).stat().st_size / 1024
    
    
    def make_archive_from_folders(folders_list, output_filename, output_format='zip'):
        """
        :param folders_list: 要压缩的文件夹路径列表。
        :param output_filename: 输出文件名,不含扩展名。
        :param output_format: 归档文件格式,例如 'zip' 或 'tar'。
        """
        # 创建一个临时目录来收集所有要压缩的文件
        temp_dir = 'temp_archive_contents'
        if Path(temp_dir).exists():
            shutil.rmtree(temp_dir)
        os.makedirs(temp_dir, exist_ok=True)
    
        for folder in folders_list:
            if Path(folder).exists():
                target_path = os.path.join(temp_dir, os.path.basename(folder))
                shutil.copytree(folder, target_path)
    
        shutil.make_archive(output_filename, output_format, temp_dir)
    
        shutil.rmtree(temp_dir)
        print(f"Archive {output_filename}.{output_format} has been created.")
    
    
    if __name__ == '__main__':
        geojson2check_fmt(r"D:\data\postdata\dt_main2\prod_road_task_heze_1698041212\groups_all.geojson",
                          r"D:\data\postdata\dt_main2\prod_road_task_heze_1698041212\groups_all.json")
    
    

    相关文章

      网友评论

          本文标题:file_util

          本文链接:https://www.haomeiwen.com/subject/mddfqjtx.html