import inspect
import json
import time
import types
from typing import List, Union
import duckdb
import pandas as pd
import polars as pl
from duckdb.duckdb.typing import VARCHAR
from duckdb.typing import DOUBLE, DuckDBPyType
from shapely import Point
from sqlalchemy import create_engine
from check_plugins import plugin_common
from tools import db_util
# Create a DuckDB connection
conn = duckdb.connect(database=':memory:')
# conn.install_extension("spatial")
# conn.load_extension("spatial")
conn.install_extension("postgres")
conn.load_extension("postgres")
def sqlalchemy():
conn.sql("""
select check_id,row_number() over (partition by check_id) from a
""").show()
query = 'SELECT "LANE_MARK_LINK_PID" as pid,st_astext("GEOMETRY") as wkt FROM "HAD_LANE_MARK_LINK" limit 10'
engine = db_util.get_sqlalchemy_engine(db)
df2 = pd.read_sql(query, conn=engine)
print(df2.shape)
engine.dispose()
conn.sql("create table lane_mark_point as select *,st_centroid(st_geomfromtext(wkt)) as point from df2")
conn.sql("create table lane_mark_polygon as select *,st_buffer(st_geomfromtext(wkt),0.001) as polygon from df2")
conn.sql("select * from lane_mark_point").show()
conn.sql("select * from lane_mark_polygon").show()
conn.sql("""
select a.point,b.polygon from lane_mark_point a,lane_mark_polygon b where st_intersects(b.polygon,a.point)
""").show()
def attach():
conn.sql("""
ATTACH 'dbname=gis_db user=postgres host=localhost port=5433 user=postgres password=postgres' AS gis_db (TYPE POSTGRES, READ_ONLY);
""")
conn.sql("SHOW ALL TABLES;").show()
conn.sql("create table abc as select * from gis_db.public.cmap_qc_log limit 10")
conn.sql("select * from abc").show()
bg = conn.sql(" select * from gis_db.tiger.pagc_gaz limit 10")
conn.sql("select * from bg").show()
conn.sql("""DETACH gis_db; """)
def postgres_scan():
conn.sql("""
SELECT * FROM postgres_scan('host=localhost port=5433 dbname=gis_db user=postgres password=postgres', 'public', 'cmap_qc_log') limit 10
""").show()
db = {
"host": "localhost",
"port": 5433,
"user": "postgres",
"password": "postgres",
"database": "gis_db"
}
def get_df():
engine = db_util.get_sqlalchemy_engine(db)
df2 = pd.read_sql(" SELECT * FROM cmap_qc_log limit 10", con=engine)
return df2
def ts2():
a = [
{
"id": "84206518689653345",
"object_pid": "84206518689653345",
"create_time": "",
"update_time": "",
"material_info": "{\"material_id\": 1, \"timestamp\": \"1722310965826\", \"type\": 0, \"task_id\": 1}",
"operator": "navinfo_1_auto",
"version": 1,
"mesh": "20597788",
"geometry": "POLYGON Z ((116.155128 40.127199 0, 116.155127 40.127204 0, 116.155122 40.127266 0, 116.155113 40.127342 0, 116.155099 40.127441 0, 116.155086 40.1275 0, 116.155068 40.127556 0, 116.155065 40.127562 0, 116.155051 40.127599 0, 116.155047 40.127608 0, 116.155044 40.127615 0, 116.155039 40.127626 0, 116.155049 40.127628 0, 116.155052 40.127623 0, 116.155057 40.12761 0, 116.15506 40.127604 0, 116.155062 40.1276 0, 116.155071 40.127576 0, 116.155076 40.127562 0, 116.155077 40.12756 0, 116.155078 40.127558 0, 116.155093 40.127558 0, 116.155093 40.12758 0, 116.155093 40.127594 0, 116.155093 40.127642 0, 116.155127 40.127642 0, 116.155128 40.127594 0, 116.155128 40.127558 0, 116.155128 40.127536 0, 116.155128 40.127513 0, 116.155129 40.127477 0, 116.155129 40.127419 0, 116.155129 40.12736 0, 116.15513 40.12731 0, 116.15513 40.127306 0, 116.15513 40.127228 0, 116.155131 40.127205 0, 116.155128 40.127199 0))"
},
{
"id": "84206525828358720",
"object_pid": "84206525828358720",
"create_time": "",
"update_time": "",
"material_info": "{\"material_id\": 1, \"timestamp\": \"1722310965826\", \"type\": 0, \"task_id\": 1}",
"operator": "navinfo_1_auto",
"version": 1,
"mesh": "20597788",
"geometry": "POLYGON Z ((116.157074 40.1271 0, 116.157051 40.127117 0, 116.157084 40.127144 0, 116.157102 40.127159 0, 116.157117 40.127171 0, 116.157123 40.127176 0, 116.157124 40.127177 0, 116.157125 40.127178 0, 116.157125 40.127179 0, 116.157126 40.127181 0, 116.157117 40.127187 0, 116.157101 40.127178 0, 116.157087 40.12717 0, 116.157059 40.127155 0, 116.157024 40.127137 0, 116.157017 40.127144 0, 116.157042 40.127156 0, 116.157048 40.127159 0, 116.157056 40.127163 0, 116.157082 40.127177 0, 116.157111 40.127193 0, 116.157112 40.127194 0, 116.157152 40.127217 0, 116.157181 40.127235 0, 116.157208 40.127253 0, 116.157241 40.127277 0, 116.15728 40.127307 0, 116.157315 40.127334 0, 116.157336 40.127354 0, 116.157337 40.127354 0, 116.157312 40.127323 0, 116.157258 40.127263 0, 116.15722 40.127225 0, 116.157183 40.12719 0, 116.15718 40.127187 0, 116.157152 40.127163 0, 116.157111 40.127128 0, 116.157082 40.127105 0, 116.157074 40.1271 0))"
}
]
df = pd.DataFrame(a)
# t1 = conn.sql("select id,st_isvalid(st_geomfromtext(geometry)) as val from df")
# conn.sql("select * from t1 where val is FALSE ").show()
# t1 = conn.sql(" select id,st_geometrytype(st_geomfromtext(geometry)) as val from df ")
# conn.sql("select * from t1 where val != 'POLYGON' ").show()
t1 = conn.sql(
" select id,geometry from df where geometry = '' or st_geometrytype(st_geomfromtext(geometry)) != 'POLYGON' ")
conn.sql("select * from t1 ").show()
# print(df)
def wkbtest():
# pg = plugin_common.fetch_pg('select geometry from wdb."HAD_OBJECT_ARROW" limit 10;', {"data_source": "wdb"})
# print(pg)
a = "010200008004000000C2A98BEB8A1A5D407C98DE7AA40244402D431CEBE2360A3F8136A417961A5D4044D72A66A90244402D431CEBE2360A3F7E5C34C7A61A5D408DF0B404B20244402D431CEBE2360A3FC68A3B5AAC1A5D400A638F79B20244402D431CEBE2360A3F"
from shapely import from_wkb
a_geom: Point = from_wkb(a)
print(a_geom.wkb_hex)
# print(f'select ST_GeomFromHexWKB({a_geom.wkt})')
conn.sql(
f"""
select ST_GeomFromHexWKB('{a_geom.wkb_hex}')
""").show()
# conn.sql(
# f"""
# select ST_GeomFromHexWKB(ST_AsHEXWKB( st_geomfromtext('{a_geom.wkt}')))
# """).show()
def sql_type_test():
# df = pd.DataFrame([{"id": 1, "val": "2"}, {"id": 1, "val": "3"}, {"id": 1, "val": None}])
# conn.sql(" select * from df where val in ( '2' ) or val is null ").show()
rows_dict = plugin_common.fetch_pg(
"""select id,mark_type from wdb."HAD_LANE_MARK_LINK" where mesh = '20596646'; """,
{"data_source": "wdb"})
df = plugin_common.rows2df(rows_dict)
print(df.head(10))
# conn.sql(" select * from df limit 10 ").show()
conn.sql(" select id,mark_type from df where mark_type::int in ( '2' ) or mark_type is null ").show()
def null_test():
data = [{'a': 1, 'b': 'Flamingo', "c": "point(1 2)"},
{'a': 2, 'b': 'Dog', "c": "point(1 2)"},
{'a': None, 'b': None, "c": None}]
df = pd.DataFrame(data)
conn.sql("select * from df").show()
conn.sql("select * from df where st_geometrytype(st_geomfromtext(c)) != 'POINT'").show() # 空
conn.sql("select a::integer from df where a not in (1,2) ").show() # 空
def udf_test():
class UDF(object):
@classmethod
def get_static_methods(cls):
static_methods = []
for attr_name, attr_value in cls.__dict__.items():
if isinstance(attr_value, staticmethod):
static_methods.append([attr_name, attr_value])
return static_methods
@staticmethod
def udf1(name: str) -> str:
return name + "_udf1"
@staticmethod
def udf2(a: str) -> int:
return int(a)
@staticmethod
def udf3(a):
return [1, 2, 3]
@staticmethod
def udf4(a):
return [1, 2, 3]
@staticmethod
def udf5(a: str) -> list[int]:
return [1, 2, 3]
@classmethod
def reg(cls, duckdb_conn):
static_methods = cls.get_static_methods()
for attr_name, attr_value in static_methods:
if attr_name == 'udf3':
duckdb_conn.create_function(attr_name, attr_value, return_type=DuckDBPyType(list[int]),
parameters=[DuckDBPyType(list[int])],
null_handling="special", exception_handling="return_null")
elif attr_name == 'udf4':
duckdb_conn.create_function(attr_name, attr_value, return_type='BIGINT[]', parameters=[VARCHAR],
null_handling="special", exception_handling="return_null")
else:
duckdb_conn.create_function(attr_name, attr_value,
null_handling="special", exception_handling="return_null")
print(UDF.get_static_methods())
# conn.create_function("aaa", UDF.udf1, null_handling="special", exception_handling="return_null")
UDF.reg(conn)
conn.sql("select udf3([1,2])").show()
conn.sql("select udf4('123')").show()
conn.sql("select udf5('123')").show()
print(DuckDBPyType(list[dict[Union[str, int], str]]))
print(DuckDBPyType(list[int]))
print(DuckDBPyType(list[str]))
print(DuckDBPyType(dict[str, int]))
print([DuckDBPyType(list[int])])
def polars_test():
import polars as pl
data = [
{"name": "Alice", "age": 25, "city": "New York", "list": [1, 23]},
{"name": "Bob", "age": 30, "city": "San Francisco", "list": [1, 23]},
{"name": "Charlie", "age": None, "city": "Los Angeles", "list": [1, 23]}
]
# 将字典列表转换为 DataFrame
df = pl.from_dicts(data)
df = df.with_columns(pl.col('age').cast(pl.Utf8))
df = df.drop_nulls(subset=['age'])
print(df['age'].to_list())
df_pl = conn.sql("select * from df").pl()
print(df_pl)
for row in df_pl.iter_rows(named=True):
print(row)
# conn.sql("select list_has_any(list,[12,14,35,34]) from df_pl").show()
conn.sql("select list_filter(list,x->list_contains([1,12,14,35,34],x)) from df_pl").show()
def st_collect():
df = pl.from_dicts([{"a": "a", "geom": "point(1 1)"}, {"a": "a", "geom": "point(1 1)"}])
conn.sql(" select a,st_collect(list(st_geomfromtext(geom))) from df group by a ").show()
def row_number():
a = [{"a": 1, "b": 1, "c": 'c1', "d": 'd1'},
{"a": 1, "b": 1, "c": 'c2', "d": 'd2'},
{"a": 2, "b": 2, "c": 'c3', "d": 'd3'},
{"a": 2, "b": 2, "c": 'c4', "d": 'd4'},
{"a": 3, "b": 3, "c": 'c5', "d": 'd5'}]
b = pl.from_dicts(a)
t1 = conn.sql("""
WITH ranked_rows AS (
SELECT a, b, c, d,
ROW_NUMBER() OVER (PARTITION BY a, b ) AS rn
FROM b ) SELECT
a, b, c, d
FROM ranked_rows where rn = 1
""")
conn.sql("select * from t1").show()
def ext_exits():
one = conn.execute("SELECT installed,loaded FROM duckdb_extensions() "
"WHERE extension_name = 'spatial';").fetchone()
print(one)
if __name__ == '__main__':
# a = time.time()
# postgres_scan()
# print(time.time() - a)
a = time.time()
# df3 = get_df()
# conn.sql("select * from df3").show()
# null_test()
udf_test()
print(time.time() - a)
def wdbtest():
# pg = plugin_common.fetch_pg('select geometry from wdb."HAD_OBJECT_ARROW" limit 10;', {"data_source": "wdb"})
# print(pg)
a = "010200008004000000C2A98BEB8A1A5D407C98DE7AA40244402D431CEBE2360A3F8136A417961A5D4044D72A66A90244402D431CEBE2360A3F7E5C34C7A61A5D408DF0B404B20244402D431CEBE2360A3FC68A3B5AAC1A5D400A638F79B20244402D431CEBE2360A3F"
from shapely import from_wkb
a_geom: Point = from_wkb(a)
print(a_geom.wkb_hex)
# print(f'select ST_GeomFromHexWKB({a_geom.wkt})')
conn.sql(
f"""
select ST_GeomFromHexWKB('{a_geom.wkb_hex}')
""").show()
# conn.sql(
# f"""
# select ST_GeomFromHexWKB(ST_AsHEXWKB( st_geomfromtext('{a_geom.wkt}')))
# """).show()
df = pd.DataFrame(a)
# t1 = conn.sql("select id,st_isvalid(st_geomfromtext(geometry)) as val from df")
# conn.sql("select * from t1 where val is FALSE ").show()
# t1 = conn.sql(" select id,st_geometrytype(st_geomfromtext(geometry)) as val from df ")
# conn.sql("select * from t1 where val != 'POLYGON' ").show()
t1 = conn.sql(
" select id,geometry from df where geometry = '' or st_geometrytype(st_geomfromtext(geometry)) != 'POLYGON' ")
conn.sql("select * from t1 ").show()
if __name__ == '__main__':
job_info_cfg = {"data_source": "wdb", "fetchdb_where_and": ""}
hdmap_db_cfg = get_yaml_cfg_by_key("wdb")
sql = f""" select id,st_astext(geometry) as geometry
from {hdmap_db_cfg['schema']}."HAD_OBJECT_FILL_AREA"
where mesh in( '20596658','20596623')
"""
rows = db_util.fetch_pg(hdmap_db_cfg, sql)
rows_dict = [dict(row) for row in rows]
layer_df = plugin_common.rows2df(rows_dict)
print(layer_df.head(2))
layer_df = layer_df.drop_nulls(subset=['id'])
obj_id_vals = layer_df['id'].to_list()
obj_id_vals = set(map(str, obj_id_vals))
objid_laneid_rows = get_objid_laneid_rows(obj_id_vals, job_info_cfg)
if objid_laneid_rows:
objid_laneid_df = plugin_common.rows2df(objid_laneid_rows)
lane_ids = [i['lane_id'] for i in objid_laneid_rows]
boundid_llaneid_rlaneid_rows = plugin_hdmap_util.fetch_boundid_llaneid_rlaneid_rows(lane_ids, job_info_cfg)
if boundid_llaneid_rlaneid_rows:
boundid_llaneid_rlaneid_df = plugin_common.rows2df(boundid_llaneid_rlaneid_rows)
with plugin_common.get_duckdb_conn_context() as duckdb_conn:
duckdb_conn.sql(f"create table df as "
f"select a.obj_id,a.lane_id,b.lane_bound_id, geometry"
f" from objid_laneid_df a, boundid_llaneid_rlaneid_df b "
f" where a.lane_id::text=b.l_lane_id::text "
f" or a.lane_id::text=b.r_lane_id::text")
duckdb_conn.sql(""" create table df2 as
WITH ranked_rows AS (
SELECT obj_id,lane_id,lane_bound_id,geometry,
ROW_NUMBER() OVER (PARTITION BY lane_id,lane_bound_id ) AS rn
FROM df )
SELECT
obj_id,lane_id,lane_bound_id,st_geomfromtext(geometry) as geometry
FROM ranked_rows where rn = 1
""")
duckdb_conn.sql(f"create table df3 as "
f"select list(obj_id)[1] as obj_id,lane_id,st_collect(list(geometry))as multi_geom "
f" from df2 group by lane_id ")
duckdb_conn.sql(
f"create table df4 as select b.obj_id,b.lane_id,multi_geom,a.geometry "
f" from layer_df a, df3 b"
f" where b.obj_id = a.id")
duckdb_conn.sql(
f"select obj_id,lane_id,st_intersects(multi_geom,st_geomfromtext(geometry)) "
f" from df4 ").show()
if __name__ == '__main__':
job_info_cfg = {"data_source": "wdb", "fetchdb_where_and": ""}
hdmap_db_cfg = get_yaml_cfg_by_key("wdb")
sql = f""" select id,st_astext(geometry) as geometry
from {hdmap_db_cfg['schema']}."HAD_OBJECT_FILL_AREA"
where mesh in( '20596658','20596623')
"""
rows = db_util.fetch_pg(hdmap_db_cfg, sql)
rows_dict = [dict(row) for row in rows]
layer_df = plugin_common.rows2df(rows_dict)
print(layer_df.head(2))
layer_df = layer_df.drop_nulls(subset=['id'])
obj_id_vals = layer_df['id'].to_list()
obj_id_vals = set(map(str, obj_id_vals))
objid_laneid_rows = get_objid_laneid_rows(obj_id_vals, job_info_cfg)
if objid_laneid_rows:
objid_laneid_df = plugin_common.rows2df(objid_laneid_rows)
lane_ids = [i['lane_id'] for i in objid_laneid_rows]
boundid_llaneid_rlaneid_rows = plugin_hdmap_util.fetch_boundid_llaneid_rlaneid_rows(lane_ids, job_info_cfg)
if boundid_llaneid_rlaneid_rows:
boundid_llaneid_rlaneid_df = plugin_common.rows2df(boundid_llaneid_rlaneid_rows)
with plugin_common.get_duckdb_conn_context() as duckdb_conn:
t1 = duckdb_conn.sql(f" select a.obj_id,a.lane_id,b.lane_bound_id, geometry"
f" from objid_laneid_df a, boundid_llaneid_rlaneid_df b "
f" where a.lane_id::text=b.l_lane_id::text "
f" or a.lane_id::text=b.r_lane_id::text")
t2 = duckdb_conn.sql("""
WITH ranked_rows AS (
SELECT obj_id,lane_id,lane_bound_id,geometry,
ROW_NUMBER() OVER (PARTITION BY lane_id,lane_bound_id ) AS rn
FROM t1 )
SELECT
obj_id,lane_id,lane_bound_id,st_geomfromtext(geometry) as geometry
FROM ranked_rows where rn = 1
""")
t3 = duckdb_conn.sql(
f" select list(obj_id)[1] as obj_id,lane_id,st_collect(list(geometry))as multi_geom "
f" from t2 group by lane_id ")
t4 = duckdb_conn.sql(
f" select b.obj_id,b.lane_id,multi_geom,a.geometry "
f" from layer_df a, t3 b"
f" where b.obj_id = a.id")
duckdb_conn.sql(
f"select obj_id,lane_id,st_intersects(multi_geom,st_geomfromtext(geometry)) "
f" from t4 ").show()
网友评论