前言
pandas.io.sql
模块封装了一组查询方法,既方便了数据检索,又减少了对特定数据库 API
的依赖。
如果安装了 SQLAlchemy
,SQLAlchemy
将会提供对数据库的抽象。此外,你还需要安装对应数据库的驱动库。
例如,PostgreSQL
对应 psycopg2
库,MySQL
对应 pymysql
库。而 SQLite
默认已经包含在标准库中。
如果没有安装 SQLAlchemy
,则仅支持 sqlite
。
主要函数有:
# 将 SQL 数据表读入 DataFrame
read_sql_table(table_name, con[, schema, …])
# 将 SQL 查询读入 DataFrame
read_sql_query(sql, con[, index_col, …])
# 将 SQL 数据表或查询读入 DataFrame
read_sql(sql, con[, index_col, …])
# 将存储在 DataFrame 中的记录写入 SQL 数据库
DataFrame.to_sql(name, con[, schema, …])
注意:
read_sql()
是 read_sql_table()
和 read_sql_query()
的封装,会根据输入自动分配给对应的函数
在下面的例子中,我们使用 SQlite
的 SQL
数据库引擎。你可以使用一个临时的 SQLite
数据库,并将数据存储在内存中
可以使用 create_engine()
函数从数据库 URI
创建引擎对象,并与 SQLAlchemy
进行连接。您只需要为每个连接的数据库创建一次引擎。
In [521]: from sqlalchemy import create_engine
# Create your engine.
In [522]: engine = create_engine("sqlite:///:memory:")
可以通过类似下面的连接来管理你的连接
with engine.connect() as conn, conn.begin():
data = pd.read_sql_table("data", conn)
1 写入 DataFrame
假设我们有以下 DataFrame
,我们可以使用 to_sql()
将其插入到数据库中
>>> data = pd.DataFrame({
'id': [26, 42, 63],
'Date': pd.date_range('2012-10-18', periods=3),
'Col_1': list('XYZ'),
'Col_2': [25.5, -12.5, 5.73],
'Col_3': [True, False, True]
})
插入到数据库
In [523]: data
Out[523]:
id Date Col_1 Col_2 Col_3
0 26 2010-10-18 X 27.50 True
1 42 2010-10-19 Y -12.50 False
2 63 2010-10-20 Z 5.73 True
In [524]: data.to_sql("data", engine)
对于某些数据库,由于超过了数据包大小的限制,写入大的 DataFrames
会导致错误。在调用 to_sql
时,可以通过设置 chunksize
参数来避免这种情况
例如,下面的代码每次以 1000
行的批量向数据库写入数据
In [525]: data.to_sql("data_chunked", engine, chunksize=1000)
1.1 SQL 数据类型
to_sql()
会根据数据的 dtype
属性尝试将你的数据映射到一个合适的 SQL
数据类型。
对于数据中 dtype
为 object
的列,pandas
会尝试推断数据的类型。
你可以通过传入字典的方式为列指定类型,例如,为字符串列指定 sqlalchemy
字符串类型而不是默认的文本类型
In [526]: from sqlalchemy.types import String
In [527]: data.to_sql("data_dtype", engine, dtype={"Col_1": String})
注意:
由于不同数据库对时间间隔的限制,timedelta64
类型的列会转换为整数纳秒值,同时发出警告信息
category
类型的列会被转换为密集表示,因此,从数据库中读取表时不会自动转换为分类类型
2 日期时间数据类型
通过使用 SQLAlchemy
,to_sql()
能够写出带有时区或不带时区的日期时间格式的数据。但是,具体的方式取决于所使用的数据库系统对日期时间格式数据的支持
当向不支持时区的数据库写入支持时区的数据时,数据将被写入为时区的时间戳,与时区相比,该时间戳对应的是本地时间
read_sql_table()
能够读取支持时区或无时区的 datetime
数据。当读取带有时区类型的时间戳时,pandas
会将数据转换为 UTC
2.1 插入方法
method
参数控制 SQL
插入语句的使用,可以用如下选项:
-
None
: 使用标准的SQL
插入语句(一行插入一条) -
'multi'
: 在一条INSERT
语句中插入多个值 - 带有
(pd_table, conn, keys, data_iter)
参数的函数: 可以根据特定的后台语言特性,实现高性能的插入方法
下面的例子是 PostgreSQL
的 COPY
语句
# Alternative to_sql() *method* for DBs that support COPY FROM
import csv
from io import StringIO
def psql_insert_copy(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
3 读取表
read_sql_table()
能够读取给定数据库表名和需要读取的列的子集
注意:在使用 read_sql_table()
之前,确保安装了 SQLAlchemy
可选依赖项
In [528]: pd.read_sql_table("data", engine)
Out[528]:
index id Date Col_1 Col_2 Col_3
0 0 26 2010-10-18 X 27.50 True
1 1 42 2010-10-19 Y -12.50 False
2 2 63 2010-10-20 Z 5.73 True
注意:
pandas
从查询结果中推断数据列的类型,而不是通过物理数据库模式中查找数据类型。
例如,假设 userid
是一个整数列,那么 select userid ...
将会返回一个整数类型的 Series
,而 select cast(userid as text) ...
将会返回一个 object
类型的 Series
。
如果查询为空,则所有列都转换为 object
类型。
你也可以指定列名作为 DataFrame
索引,并指定要读取的列的子集。
In [529]: pd.read_sql_table("data", engine, index_col="id")
Out[529]:
index Date Col_1 Col_2 Col_3
id
26 0 2010-10-18 X 27.50 True
42 1 2010-10-19 Y -12.50 False
63 2 2010-10-20 Z 5.73 True
In [530]: pd.read_sql_table("data", engine, columns=["Col_1", "Col_2"])
Out[530]:
Col_1 Col_2
0 X 27.50
1 Y -12.50
2 Z 5.73
你可以显式地将列强制解析为日期:
In [531]: pd.read_sql_table("data", engine, parse_dates=["Date"])
Out[531]:
index id Date Col_1 Col_2 Col_3
0 0 26 2010-10-18 X 27.50 True
1 1 42 2010-10-19 Y -12.50 False
2 2 63 2010-10-20 Z 5.73 True
如果需要的话,你可以显式地指定一个格式化字符串,或者一个参数字典来传递给 pandas.to_datetime()
pd.read_sql_table("data", engine, parse_dates={"Date": "%Y-%m-%d"})
pd.read_sql_table(
"data",
engine,
parse_dates={"Date": {"format": "%Y-%m-%d %H:%M:%S"}},
)
您可以使用 has_table()
检查表是否存在
4 模式支持
read_sql_table()
和 to_sql()
函数中的 schema
关键字支持不同模式的读写。但是请注意,这取决于数据库(sqlite
不支持模式)。例如
df.to_sql("table", engine, schema="other_schema")
pd.read_sql_table("table", engine, schema="other_schema")
5 查询
你可以在 read_sql_query()
函数中使用原始 SQL
进行查询。在这种情况下,你必须使用适合你的数据库的 SQL
变体。
当使用 SQLAlchemy
时,你也可以传递 SQLAlchemy
表达式语言结构,这些结构与数据库无关。
In [532]: pd.read_sql_query("SELECT * FROM data", engine)
Out[532]:
index id Date Col_1 Col_2 Col_3
0 0 26 2010-10-18 00:00:00.000000 X 27.50 1
1 1 42 2010-10-19 00:00:00.000000 Y -12.50 0
2 2 63 2010-10-20 00:00:00.000000 Z 5.73 1
当然,你可以指定更复杂的查询
In [533]: pd.read_sql_query("SELECT id, Col_1, Col_2 FROM data WHERE id = 42;", engine)
Out[533]:
id Col_1 Col_2
0 42 Y -12.5
read_sql_query()
函数支持 chunksize
参数。指定这个参数将返回一个查询结果的迭代器。
In [534]: df = pd.DataFrame(np.random.randn(20, 3), columns=list("abc"))
In [535]: df.to_sql("data_chunks", engine, index=False)
In [536]: for chunk in pd.read_sql_query("SELECT * FROM data_chunks", engine, chunksize=5):
.....: print(chunk)
.....:
a b c
0 0.092961 -0.674003 1.104153
1 -0.092732 -0.156246 -0.585167
2 -0.358119 -0.862331 -1.672907
3 0.550313 -1.507513 -0.617232
4 0.650576 1.033221 0.492464
a b c
0 -1.627786 -0.692062 1.039548
1 -1.802313 -0.890905 -0.881794
2 0.630492 0.016739 0.014500
3 -0.438358 0.647275 -0.052075
4 0.673137 1.227539 0.203534
a b c
0 0.861658 0.867852 -0.465016
1 1.547012 -0.947189 -1.241043
2 0.070470 0.901320 0.937577
3 0.295770 1.420548 -0.005283
4 -1.518598 -0.730065 0.226497
a b c
0 -2.061465 0.632115 0.853619
1 2.719155 0.139018 0.214557
2 -1.538924 -0.366973 -0.748801
3 -0.478137 -1.559153 -3.097759
4 -2.320335 -0.221090 0.119763
你也可以用 execute()
运行一个普通的查询,而不需要创建一个 DataFrame
这对于不返回值的查询很有用,比如 INSERT
。这在功能上等同于在 SQLAlchemy
引擎或 db
连接对象上调用执行。
from pandas.io import sql
sql.execute("SELECT * FROM table_name", engine)
sql.execute(
"INSERT INTO table_name VALUES(?, ?, ?)", engine, params=[("id", 1, 12.2, True)]
)
6 引擎连接实例
要连接 SQLAlchemy
,你可以使用 create_engine()
函数从数据库 URI
中创建一个引擎对象。你只需要为你连接的每个数据库创建一次引擎。
from sqlalchemy import create_engine
engine = create_engine("postgresql://scott:tiger@localhost:5432/mydatabase")
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")
engine = create_engine("oracle://scott:tiger@127.0.0.1:1521/sidname")
engine = create_engine("mssql+pyodbc://mydsn")
# sqlite://<nohostname>/<path>
# where <path> is relative:
engine = create_engine("sqlite:///foo.db")
# or absolute, starting with a slash:
engine = create_engine("sqlite:////absolute/path/to/foo.db")
7 高级 SQLAlchemy 查询
可以使用 SQLAlchemy
结构来描述查询
使用 sqlalchemy.text()
以后端无关的方式指定查询参数
In [537]: import sqlalchemy as sa
In [538]: pd.read_sql(
.....: sa.text("SELECT * FROM data where Col_1=:col1"), engine, params={"col1": "X"}
.....: )
.....:
Out[538]:
index id Date Col_1 Col_2 Col_3
0 0 26 2010-10-18 00:00:00.000000 X 27.5 1
如果你的数据库有 SQLAlchemy
描述,那么可以使用 SQLAlchemy
表达式来表示 where
条件
In [539]: metadata = sa.MetaData()
In [540]: data_table = sa.Table(
.....: "data",
.....: metadata,
.....: sa.Column("index", sa.Integer),
.....: sa.Column("Date", sa.DateTime),
.....: sa.Column("Col_1", sa.String),
.....: sa.Column("Col_2", sa.Float),
.....: sa.Column("Col_3", sa.Boolean),
.....: )
.....:
In [541]: pd.read_sql(sa.select([data_table]).where(data_table.c.Col_3 is True), engine)
Out[541]:
Empty DataFrame
Columns: [index, Date, Col_1, Col_2, Col_3]
Index: []
您可以使用 sqlalchemy.bindparam()
将参数和的 SQLAlchemy
表达式一起传递给 read_sql()
In [542]: import datetime as dt
In [543]: expr = sa.select([data_table]).where(data_table.c.Date > sa.bindparam("date"))
In [544]: pd.read_sql(expr, engine, params={"date": dt.datetime(2010, 10, 18)})
Out[544]:
index Date Col_1 Col_2 Col_3
0 1 2010-10-19 Y -12.50 False
1 2 2010-10-20 Z 5.73 True
8 sqlite
可以直接使用 sqlite
创建连接
import sqlite3
con = sqlite3.connect(":memory:")
然后,执行查询
data.to_sql("data", con)
pd.read_sql_query("SELECT * FROM data", con)
网友评论