ducktype

作者: hehehehe | 来源:发表于2024-07-30 20:06 被阅读0次
    import inspect
    import json
    import time
    import types
    from typing import List, Union
    
    import duckdb
    import pandas as pd
    import polars as pl
    from duckdb.duckdb.typing import VARCHAR
    from duckdb.typing import DOUBLE, DuckDBPyType
    from shapely import Point
    from sqlalchemy import create_engine
    
    from check_plugins import plugin_common
    from tools import db_util
    
    # Create a DuckDB connection
    conn = duckdb.connect(database=':memory:')
    # conn.install_extension("spatial")
    # conn.load_extension("spatial")
    
    conn.install_extension("postgres")
    conn.load_extension("postgres")
    
    
    def sqlalchemy():
        conn.sql("""
            select check_id,row_number() over (partition by check_id) from a
        """).show()
    
        query = 'SELECT "LANE_MARK_LINK_PID" as pid,st_astext("GEOMETRY") as wkt FROM "HAD_LANE_MARK_LINK" limit 10'
        engine = db_util.get_sqlalchemy_engine(db)
        df2 = pd.read_sql(query, conn=engine)
        print(df2.shape)
        engine.dispose()
    
        conn.sql("create table lane_mark_point as select *,st_centroid(st_geomfromtext(wkt)) as point from df2")
        conn.sql("create table lane_mark_polygon as select *,st_buffer(st_geomfromtext(wkt),0.001) as polygon from df2")
    
        conn.sql("select * from lane_mark_point").show()
        conn.sql("select * from lane_mark_polygon").show()
    
        conn.sql("""
                select a.point,b.polygon from lane_mark_point a,lane_mark_polygon b where st_intersects(b.polygon,a.point)
            """).show()
    
    
    def attach():
        conn.sql("""
            ATTACH 'dbname=gis_db user=postgres host=localhost port=5433 user=postgres password=postgres' AS gis_db (TYPE POSTGRES, READ_ONLY);
        """)
        conn.sql("SHOW ALL TABLES;").show()
        conn.sql("create table abc as select * from gis_db.public.cmap_qc_log limit 10")
        conn.sql("select * from abc").show()
        bg = conn.sql(" select * from gis_db.tiger.pagc_gaz limit 10")
        conn.sql("select * from bg").show()
        conn.sql("""DETACH gis_db; """)
    
    
    def postgres_scan():
        conn.sql("""
            SELECT * FROM postgres_scan('host=localhost port=5433 dbname=gis_db user=postgres password=postgres', 'public', 'cmap_qc_log') limit 10    
        """).show()
    
    
    db = {
        "host": "localhost",
        "port": 5433,
        "user": "postgres",
        "password": "postgres",
        "database": "gis_db"
    }
    
    
    def get_df():
        engine = db_util.get_sqlalchemy_engine(db)
        df2 = pd.read_sql(" SELECT * FROM cmap_qc_log limit 10", con=engine)
        return df2
    
    
    def ts2():
        a = [
            {
                "id": "84206518689653345",
                "object_pid": "84206518689653345",
                "create_time": "",
                "update_time": "",
                "material_info": "{\"material_id\": 1, \"timestamp\": \"1722310965826\", \"type\": 0, \"task_id\": 1}",
                "operator": "navinfo_1_auto",
                "version": 1,
                "mesh": "20597788",
                "geometry": "POLYGON Z ((116.155128 40.127199 0, 116.155127 40.127204 0, 116.155122 40.127266 0, 116.155113 40.127342 0, 116.155099 40.127441 0, 116.155086 40.1275 0, 116.155068 40.127556 0, 116.155065 40.127562 0, 116.155051 40.127599 0, 116.155047 40.127608 0, 116.155044 40.127615 0, 116.155039 40.127626 0, 116.155049 40.127628 0, 116.155052 40.127623 0, 116.155057 40.12761 0, 116.15506 40.127604 0, 116.155062 40.1276 0, 116.155071 40.127576 0, 116.155076 40.127562 0, 116.155077 40.12756 0, 116.155078 40.127558 0, 116.155093 40.127558 0, 116.155093 40.12758 0, 116.155093 40.127594 0, 116.155093 40.127642 0, 116.155127 40.127642 0, 116.155128 40.127594 0, 116.155128 40.127558 0, 116.155128 40.127536 0, 116.155128 40.127513 0, 116.155129 40.127477 0, 116.155129 40.127419 0, 116.155129 40.12736 0, 116.15513 40.12731 0, 116.15513 40.127306 0, 116.15513 40.127228 0, 116.155131 40.127205 0, 116.155128 40.127199 0))"
            },
            {
                "id": "84206525828358720",
                "object_pid": "84206525828358720",
                "create_time": "",
                "update_time": "",
                "material_info": "{\"material_id\": 1, \"timestamp\": \"1722310965826\", \"type\": 0, \"task_id\": 1}",
                "operator": "navinfo_1_auto",
                "version": 1,
                "mesh": "20597788",
                "geometry": "POLYGON Z ((116.157074 40.1271 0, 116.157051 40.127117 0, 116.157084 40.127144 0, 116.157102 40.127159 0, 116.157117 40.127171 0, 116.157123 40.127176 0, 116.157124 40.127177 0, 116.157125 40.127178 0, 116.157125 40.127179 0, 116.157126 40.127181 0, 116.157117 40.127187 0, 116.157101 40.127178 0, 116.157087 40.12717 0, 116.157059 40.127155 0, 116.157024 40.127137 0, 116.157017 40.127144 0, 116.157042 40.127156 0, 116.157048 40.127159 0, 116.157056 40.127163 0, 116.157082 40.127177 0, 116.157111 40.127193 0, 116.157112 40.127194 0, 116.157152 40.127217 0, 116.157181 40.127235 0, 116.157208 40.127253 0, 116.157241 40.127277 0, 116.15728 40.127307 0, 116.157315 40.127334 0, 116.157336 40.127354 0, 116.157337 40.127354 0, 116.157312 40.127323 0, 116.157258 40.127263 0, 116.15722 40.127225 0, 116.157183 40.12719 0, 116.15718 40.127187 0, 116.157152 40.127163 0, 116.157111 40.127128 0, 116.157082 40.127105 0, 116.157074 40.1271 0))"
            }
        ]
    
        df = pd.DataFrame(a)
        # t1 = conn.sql("select id,st_isvalid(st_geomfromtext(geometry)) as val from df")
        # conn.sql("select * from t1 where val is FALSE ").show()
        # t1 = conn.sql(" select id,st_geometrytype(st_geomfromtext(geometry)) as val from df  ")
        # conn.sql("select * from t1 where val != 'POLYGON' ").show()
    
        t1 = conn.sql(
            " select id,geometry from df where geometry = '' or st_geometrytype(st_geomfromtext(geometry)) != 'POLYGON' ")
        conn.sql("select * from t1  ").show()
        # print(df)
    
    
    def wkbtest():
        # pg = plugin_common.fetch_pg('select geometry  from wdb."HAD_OBJECT_ARROW" limit 10;', {"data_source": "wdb"})
        # print(pg)
        a = "010200008004000000C2A98BEB8A1A5D407C98DE7AA40244402D431CEBE2360A3F8136A417961A5D4044D72A66A90244402D431CEBE2360A3F7E5C34C7A61A5D408DF0B404B20244402D431CEBE2360A3FC68A3B5AAC1A5D400A638F79B20244402D431CEBE2360A3F"
        from shapely import from_wkb
        a_geom: Point = from_wkb(a)
        print(a_geom.wkb_hex)
        # print(f'select  ST_GeomFromHexWKB({a_geom.wkt})')
        conn.sql(
            f"""
                select  ST_GeomFromHexWKB('{a_geom.wkb_hex}')
            """).show()
        # conn.sql(
        #     f"""
        #         select  ST_GeomFromHexWKB(ST_AsHEXWKB( st_geomfromtext('{a_geom.wkt}')))
        #     """).show()
    
    
    def sql_type_test():
        # df = pd.DataFrame([{"id": 1, "val": "2"}, {"id": 1, "val": "3"}, {"id": 1, "val": None}])
        # conn.sql(" select * from df where val in ( '2' ) or val is null ").show()
        rows_dict = plugin_common.fetch_pg(
            """select id,mark_type from wdb."HAD_LANE_MARK_LINK" where mesh = '20596646'; """,
            {"data_source": "wdb"})
        df = plugin_common.rows2df(rows_dict)
        print(df.head(10))
        # conn.sql(" select * from df limit 10 ").show()
        conn.sql(" select id,mark_type from df where mark_type::int in ( '2' ) or mark_type is null ").show()
    
    
    def null_test():
        data = [{'a': 1, 'b': 'Flamingo', "c": "point(1 2)"},
                {'a': 2, 'b': 'Dog', "c": "point(1 2)"},
                {'a': None, 'b': None, "c": None}]
        df = pd.DataFrame(data)
        conn.sql("select * from df").show()
        conn.sql("select * from df where st_geometrytype(st_geomfromtext(c)) != 'POINT'").show()  # 空
        conn.sql("select a::integer from df where a not in (1,2) ").show()  # 空
    
    
    def udf_test():
        class UDF(object):
            @classmethod
            def get_static_methods(cls):
                static_methods = []
                for attr_name, attr_value in cls.__dict__.items():
                    if isinstance(attr_value, staticmethod):
                        static_methods.append([attr_name, attr_value])
                return static_methods
    
            @staticmethod
            def udf1(name: str) -> str:
                return name + "_udf1"
    
            @staticmethod
            def udf2(a: str) -> int:
                return int(a)
    
            @staticmethod
            def udf3(a):
                return [1, 2, 3]
    
            @staticmethod
            def udf4(a):
                return [1, 2, 3]
    
            @staticmethod
            def udf5(a: str) -> list[int]:
                return [1, 2, 3]
    
            @classmethod
            def reg(cls, duckdb_conn):
                static_methods = cls.get_static_methods()
                for attr_name, attr_value in static_methods:
                    if attr_name == 'udf3':
                        duckdb_conn.create_function(attr_name, attr_value, return_type=DuckDBPyType(list[int]),
                                                    parameters=[DuckDBPyType(list[int])],
                                                    null_handling="special", exception_handling="return_null")
                    elif attr_name == 'udf4':
                        duckdb_conn.create_function(attr_name, attr_value, return_type='BIGINT[]', parameters=[VARCHAR],
                                                    null_handling="special", exception_handling="return_null")
                    else:
                        duckdb_conn.create_function(attr_name, attr_value,
                                                    null_handling="special", exception_handling="return_null")
    
        print(UDF.get_static_methods())
        # conn.create_function("aaa", UDF.udf1,  null_handling="special", exception_handling="return_null")
        UDF.reg(conn)
        conn.sql("select udf3([1,2])").show()
        conn.sql("select udf4('123')").show()
        conn.sql("select udf5('123')").show()
        print(DuckDBPyType(list[dict[Union[str, int], str]]))
        print(DuckDBPyType(list[int]))
        print(DuckDBPyType(list[str]))
        print(DuckDBPyType(dict[str, int]))
        print([DuckDBPyType(list[int])])
    
    
    def polars_test():
        import polars as pl
        data = [
            {"name": "Alice", "age": 25, "city": "New York", "list": [1, 23]},
            {"name": "Bob", "age": 30, "city": "San Francisco", "list": [1, 23]},
            {"name": "Charlie", "age": None, "city": "Los Angeles", "list": [1, 23]}
        ]
    
        # 将字典列表转换为 DataFrame
        df = pl.from_dicts(data)
        df = df.with_columns(pl.col('age').cast(pl.Utf8))
        df = df.drop_nulls(subset=['age'])
        print(df['age'].to_list())
        df_pl = conn.sql("select * from df").pl()
        print(df_pl)
        for row in df_pl.iter_rows(named=True):
            print(row)
        # conn.sql("select list_has_any(list,[12,14,35,34]) from df_pl").show()
        conn.sql("select list_filter(list,x->list_contains([1,12,14,35,34],x)) from df_pl").show()
    
    
    def st_collect():
        df = pl.from_dicts([{"a": "a", "geom": "point(1 1)"}, {"a": "a", "geom": "point(1 1)"}])
        conn.sql(" select a,st_collect(list(st_geomfromtext(geom))) from df group by a ").show()
    
    
    def row_number():
        a = [{"a": 1, "b": 1, "c": 'c1', "d": 'd1'},
             {"a": 1, "b": 1, "c": 'c2', "d": 'd2'},
             {"a": 2, "b": 2, "c": 'c3', "d": 'd3'},
             {"a": 2, "b": 2, "c": 'c4', "d": 'd4'},
             {"a": 3, "b": 3, "c": 'c5', "d": 'd5'}]
        b = pl.from_dicts(a)
        t1 = conn.sql("""
                WITH ranked_rows AS (
                    SELECT  a,  b,  c,  d, 
                    ROW_NUMBER() OVER (PARTITION BY a, b ) AS rn
                    FROM  b ) SELECT 
                    a, b,  c,  d
                FROM  ranked_rows where rn = 1
    
        """)
        conn.sql("select * from t1").show()
    
    
    def ext_exits():
        one = conn.execute("SELECT installed,loaded FROM duckdb_extensions() "
                           "WHERE extension_name = 'spatial';").fetchone()
        print(one)
    
    
    if __name__ == '__main__':
        # a = time.time()
        # postgres_scan()
        # print(time.time() - a)
        a = time.time()
        # df3 = get_df()
        # conn.sql("select * from df3").show()
        # null_test()
        udf_test()
        print(time.time() - a)
    
    
    def wdbtest():
        # pg = plugin_common.fetch_pg('select geometry  from wdb."HAD_OBJECT_ARROW" limit 10;', {"data_source": "wdb"})
        # print(pg)
        a = "010200008004000000C2A98BEB8A1A5D407C98DE7AA40244402D431CEBE2360A3F8136A417961A5D4044D72A66A90244402D431CEBE2360A3F7E5C34C7A61A5D408DF0B404B20244402D431CEBE2360A3FC68A3B5AAC1A5D400A638F79B20244402D431CEBE2360A3F"
        from shapely import from_wkb
        a_geom: Point = from_wkb(a)
        print(a_geom.wkb_hex)
        # print(f'select  ST_GeomFromHexWKB({a_geom.wkt})')
        conn.sql(
            f"""
                select  ST_GeomFromHexWKB('{a_geom.wkb_hex}')
            """).show()
        # conn.sql(
        #     f"""
        #         select  ST_GeomFromHexWKB(ST_AsHEXWKB( st_geomfromtext('{a_geom.wkt}')))
        #     """).show()
    
    
        df = pd.DataFrame(a)
        # t1 = conn.sql("select id,st_isvalid(st_geomfromtext(geometry)) as val from df")
        # conn.sql("select * from t1 where val is FALSE ").show()
        # t1 = conn.sql(" select id,st_geometrytype(st_geomfromtext(geometry)) as val from df  ")
        # conn.sql("select * from t1 where val != 'POLYGON' ").show()
    
        t1 = conn.sql(
            " select id,geometry from df where geometry = '' or st_geometrytype(st_geomfromtext(geometry)) != 'POLYGON' ")
        conn.sql("select * from t1  ").show()
    
    
    if __name__ == '__main__':
        job_info_cfg = {"data_source": "wdb", "fetchdb_where_and": ""}
        hdmap_db_cfg = get_yaml_cfg_by_key("wdb")
        sql = f""" select id,st_astext(geometry) as geometry
                 from {hdmap_db_cfg['schema']}."HAD_OBJECT_FILL_AREA" 
                 where mesh in( '20596658','20596623')
             """
        rows = db_util.fetch_pg(hdmap_db_cfg, sql)
        rows_dict = [dict(row) for row in rows]
        layer_df = plugin_common.rows2df(rows_dict)
        print(layer_df.head(2))
    
        layer_df = layer_df.drop_nulls(subset=['id'])
        obj_id_vals = layer_df['id'].to_list()
        obj_id_vals = set(map(str, obj_id_vals))
        objid_laneid_rows = get_objid_laneid_rows(obj_id_vals, job_info_cfg)
        if objid_laneid_rows:
            objid_laneid_df = plugin_common.rows2df(objid_laneid_rows)
            lane_ids = [i['lane_id'] for i in objid_laneid_rows]
            boundid_llaneid_rlaneid_rows = plugin_hdmap_util.fetch_boundid_llaneid_rlaneid_rows(lane_ids, job_info_cfg)
            if boundid_llaneid_rlaneid_rows:
                boundid_llaneid_rlaneid_df = plugin_common.rows2df(boundid_llaneid_rlaneid_rows)
                with plugin_common.get_duckdb_conn_context() as duckdb_conn:
                    duckdb_conn.sql(f"create table df as "
                                    f"select a.obj_id,a.lane_id,b.lane_bound_id, geometry"
                                    f" from objid_laneid_df a, boundid_llaneid_rlaneid_df b "
                                    f" where a.lane_id::text=b.l_lane_id::text "
                                    f" or a.lane_id::text=b.r_lane_id::text")
                    duckdb_conn.sql(""" create table df2 as
                            WITH ranked_rows AS (
                                SELECT  obj_id,lane_id,lane_bound_id,geometry,
                                ROW_NUMBER() OVER (PARTITION BY lane_id,lane_bound_id ) AS rn
                                FROM  df ) 
                            SELECT 
                                obj_id,lane_id,lane_bound_id,st_geomfromtext(geometry) as geometry
                            FROM  ranked_rows where rn = 1
                    """)
                    duckdb_conn.sql(f"create table df3 as "
                                    f"select list(obj_id)[1] as obj_id,lane_id,st_collect(list(geometry))as multi_geom  "
                                    f" from df2 group by lane_id ")
                    duckdb_conn.sql(
                        f"create table df4 as select b.obj_id,b.lane_id,multi_geom,a.geometry "
                        f" from layer_df a, df3 b"
                        f" where b.obj_id = a.id")
    
                    duckdb_conn.sql(
                        f"select obj_id,lane_id,st_intersects(multi_geom,st_geomfromtext(geometry)) "
                        f" from df4 ").show()
    
    
    
    
    if __name__ == '__main__':
        job_info_cfg = {"data_source": "wdb", "fetchdb_where_and": ""}
        hdmap_db_cfg = get_yaml_cfg_by_key("wdb")
        sql = f""" select id,st_astext(geometry) as geometry
                 from {hdmap_db_cfg['schema']}."HAD_OBJECT_FILL_AREA" 
                 where mesh in( '20596658','20596623')
             """
        rows = db_util.fetch_pg(hdmap_db_cfg, sql)
        rows_dict = [dict(row) for row in rows]
        layer_df = plugin_common.rows2df(rows_dict)
        print(layer_df.head(2))
    
        layer_df = layer_df.drop_nulls(subset=['id'])
        obj_id_vals = layer_df['id'].to_list()
        obj_id_vals = set(map(str, obj_id_vals))
        objid_laneid_rows = get_objid_laneid_rows(obj_id_vals, job_info_cfg)
        if objid_laneid_rows:
            objid_laneid_df = plugin_common.rows2df(objid_laneid_rows)
            lane_ids = [i['lane_id'] for i in objid_laneid_rows]
            boundid_llaneid_rlaneid_rows = plugin_hdmap_util.fetch_boundid_llaneid_rlaneid_rows(lane_ids, job_info_cfg)
            if boundid_llaneid_rlaneid_rows:
                boundid_llaneid_rlaneid_df = plugin_common.rows2df(boundid_llaneid_rlaneid_rows)
                with plugin_common.get_duckdb_conn_context() as duckdb_conn:
                    t1 = duckdb_conn.sql(f" select a.obj_id,a.lane_id,b.lane_bound_id, geometry"
                                         f" from objid_laneid_df a, boundid_llaneid_rlaneid_df b "
                                         f" where a.lane_id::text=b.l_lane_id::text "
                                         f" or a.lane_id::text=b.r_lane_id::text")
                    t2 = duckdb_conn.sql(""" 
                            WITH ranked_rows AS (
                                SELECT  obj_id,lane_id,lane_bound_id,geometry,
                                ROW_NUMBER() OVER (PARTITION BY lane_id,lane_bound_id ) AS rn
                                FROM  t1 ) 
                            SELECT 
                                obj_id,lane_id,lane_bound_id,st_geomfromtext(geometry) as geometry
                            FROM  ranked_rows where rn = 1
                    """)
                    t3 = duckdb_conn.sql(
                        f" select list(obj_id)[1] as obj_id,lane_id,st_collect(list(geometry))as multi_geom  "
                        f" from t2 group by lane_id ")
                    t4 = duckdb_conn.sql(
                        f" select b.obj_id,b.lane_id,multi_geom,a.geometry "
                        f" from layer_df a, t3 b"
                        f" where b.obj_id = a.id")
    
                    duckdb_conn.sql(
                        f"select obj_id,lane_id,st_intersects(multi_geom,st_geomfromtext(geometry)) "
                        f" from t4 ").show()
    

    相关文章

      网友评论

          本文标题:ducktype

          本文链接:https://www.haomeiwen.com/subject/giwyhjtx.html