import pymssql
class ConnectUtils:
def __init__(self, db_name='tempdb'):
server = '10.35.16.60'
user = 'qadu008'
password = '!zyn123!'
self.conn = pymssql.connect(server, user, password, db_name)
def qeury_all_table_name(self, db_name=None):
with self.conn.cursor() as cursor:
if db_name:
cursor.execute(f'USE {db_name}')
cursor.execute('select table_name FROM information_schema.tables')
result = cursor.fetchall()
db_names = [x[0] for x in result]
return db_names
def qeury_all_table(self, db_name=None):
"""
:param db_name:
:return: the table name and object id
"""
with self.conn.cursor() as cursor:
if db_name:
cursor.execute(f'USE {db_name}')
cursor.execute('select name,object_id FROM sys.tables')
result = cursor.fetchall()
db_names = [(x[0], x[1]) for x in result]
return db_names
def query_any_field(self,table_name, object_id, target, system_type_id):
"""
There will be error when target type is not the same as column type
:param table_name:
:param object_id:
:param target:
:return:
"""
with self.conn.cursor() as cursor:
cursor.execute(
f"select name from sys.columns where object_id ={object_id} and system_type_id = {system_type_id}")
result = cursor.fetchall()
result = [x[0] for x in result]
if not result:
return 0
result = ','.join(result)
cursor.execute(f"select count(*) from {table_name} where '{target}' in ({result})")
result = cursor.fetchall()
return result[0][0]
def qeury_all_db_name(self):
with self.conn.cursor() as cursor:
cursor.execute('select name FROM sys.databases;')
result = cursor.fetchall()
db_names = [x[0] for x in result]
return db_names
def query_table(self, table_name, db_name=None):
with self.conn.cursor() as cursor:
if db_name:
cursor.execute(f'USE {db_name}')
sql = f"select * from {table_name}"
cursor.execute(sql)
for row in cursor:
yield row
网友评论