connect
连接是线程安全的,可以在多个线程之间共享。有关详细信息,请参阅 线程和进程安全。
连接可以用作上下文管理器。请注意,上下文包装事务:如果上下文成功退出,则提交事务,如果退出并出现异常,则回滚事务。请注意,连接不会被上下文关闭,它可以用于多个上下文。
conn = psycopg2.connect(DSN)
with conn:
with conn.cursor() as curs:
curs.execute(SQL1)
with conn:
with conn.cursor() as curs:
curs.execute(SQL2)
# leaving contexts doesn't close the connection
conn.close()
cursor
从同一连接创建的游标不是孤立的,即一个游标对数据库所做的任何更改都可以立即被其他游标看到。从不同连接创建的游标可以或不能隔离,具体取决于连接的隔离级别。另见rollback()
和 commit()
方法。
游标不是线程安全的:多线程应用程序可以从同一个连接创建多个游标,并且应该使用来自单个线程的每个游标。有关详细信息,请参阅线程和进程安全。
游标可以用作上下文管理器:离开上下文将关闭游标。
with conn.cursor() as curs:
curs.execute(SQL)
# the cursor is now closed
事务
从 version 2.5 开始,psycopg2 的连接和游标是上下文管理器,可以与with语句一起使用:
with psycopg2.connect(DSN) as conn:
with conn.cursor() as curs:
curs.execute(SQL)
当连接退出with块时,如果块没有引发异常,则提交事务。在异常的情况下,事务被回滚。
当游标退出with块时,它被关闭,释放最终与其关联的任何资源。事务状态不受影响。
一个连接可以在多个with语句中使用,并且每个with块都有效地包装在一个单独的事务中:
conn = psycopg2.connect(DSN)
with conn:
with conn.cursor() as curs:
curs.execute(SQL1)
with conn:
with conn.cursor() as curs:
curs.execute(SQL2)
conn.close()
警告 与文件对象或其他资源不同,退出连接的 with块不会关闭连接,而只会关闭与其关联的事务。如果你想确保连接在某个时间点后关闭,你仍然应该使用 try-catch 块:
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_batch, execute_values
from tool.logger_config import logger
import yaml
st_geomfromtext = "st_geomfromtext('point(%s %s)',4326)"
st_geomfromtext2 = "st_geomfromtext(%s,4326)"
def list_join_in(items, sep):
return sep.join("'" + str(item) + "'" for item in items)
def list_join(items, sep):
return sep.join(str(item) for item in items)
def get_conn(host, port, user, database, password):
conn = psycopg2.connect(database=database, user=user, password=password,
host=host, port=port)
logger.info("connected success")
return conn
def close_conn(conn):
try:
conn.close()
except:
pass
def batch_execute(cursor, sql, val_list, page_size=1000):
if val_list:
execute_batch(cursor, sql, val_list, page_size)
def values_execute(cursor, sql, val_list, page_size=1000):
if val_list:
execute_values(cursor, sql, val_list, template=None, page_size=page_size)
def many_execute(cursor, sql, val_list):
if val_list:
cursor.executemany(sql, val_list)
def get_insert_sql(table_name, insert_fields: list, has_geom: bool) -> str:
if has_geom:
sql = f"""
insert into {table_name} ({list_join(insert_fields, ',')})
values({"%s," * (len(insert_fields) - 1) + "st_geomfromtext(%s,4326)"})
"""
else:
sql = f"""
insert into {table_name} ({list_join(insert_fields, ',')})
values({"%s," * (len(insert_fields) - 1) + "%s"})
"""
return sql
def obj2list(fields, obj_list):
val_list = []
for obj in obj_list:
val_list.append([getattr(obj, field) for field in fields])
return val_list
def get_yaml_db(db_name, cfg_file: Path) -> tuple:
cfg = yaml.load(cfg_file.read_text(), Loader=yaml.FullLoader)
database = cfg[db_name]['database']
user = cfg[db_name]['user']
password = cfg[db_name]['password']
host = cfg[db_name]['host']
port = cfg[db_name]['port']
return host, port, database, user, password,
网友评论