import sqlparse
from collections import namedtuple
"""
contextManager
"""
def split(s: str):
"""
it is used to split SQL string to tuple.
:param s: sql String
:return: tuple
"""
res = list(filter(lambda x: x != "" and x != ";", s.replace("\n", "").split(" ")))
if len(res) == 0:
return tuple()
else:
res[-1] = res[-1].rstrip(";")
return tuple(map(lambda x: x.lower(), res))
def sqltuple2json(t: tuple):
from_tag = "from"
table_name = t[t.index(from_tag)+1] if from_tag in t else ""
print(table_name)
SqlType = {
"select": "select",
"create": "create",
"drop": "drop",
"alter": "alter",
"insert": "insert",
"explain": "explain",
"update": "update",
"delete": "delete"
}
def get_sql_type(t: tuple):
return SqlType[t[0]]
def get_collection(t: tuple):
sql_type = get_sql_type(t)
if sql_type is "select":
from_tag = "from"
return t[t.index(from_tag) + 1] if from_tag in t else ""
class Parse:
__slots__ = ()
@staticmethod
def parse(t: tuple):
sql_type = get_sql_type(t)
if sql_type is "select":
table_name = t[t.index("from") + 1] if "from" in t else ""
if not table_name:
raise SQLMongoException("collection is empty")
condition = t[t.index("where") + 1:] if "where" in t else ""
columns = Parse._get_columns(t[t.index("select") + 1:t.index("from")])
if not condition:
if columns == ("*",):
return {}
else:
res = list(map(lambda x: (x, 1), columns))
if "id" in columns:
res.append(("_id", 0))
return dict(res)
@staticmethod
def _get_columns(t: tuple)->tuple:
return tuple(map(lambda x: x.strip(","), filter(lambda x: x != ",", t)))
class SQLMongoException(Exception):
pass
if __name__ == '__main__':
r = split(b)
s = sqlparse.format(b)
print("---",s)
parsed = Parse.parse(r)
print(parsed)
网友评论