美文网首页
python3 分批读写mysql

python3 分批读写mysql

作者: venuslf | 来源:发表于2019-09-30 16:44 被阅读0次

分批读取

import pandas as pd
import pymysql
from sqlalchemy import create_engine
pymysql.install_as_MySQLdb()

第一种读取数据的方式

#建立数据库连接
con=pymysql.connect(
  host="10.***.***.***",      #ip地址
  database="db1",        #需读取的数据表所在数据库的库名
  user="user",               #mysql用户名
  password="password",    #密码
  port=3306,       #端口号
  charset='utf8'    
)

cur = con.cursor()     #创建游标

#数据读取函数
def read_table(cur, sql): 
    try:
        cur.execute(sql) 
        d  = cur.fetchall()
        df = pd.DataFrame(list(d))
    except Exception as e:
        df = pd.DataFrame()
        print('read data from mysql failed:',sqli)
        print(e)
    return df

#设置分批的节点
def batch_generate(n, b):  #n 数据条数,b分批数目
    batch = []
    for i in range(0, n, int(n/b)): 
        batch.append(i)
    batch.append(n)
    return batch

batch_i = batch_generate(n=10050, b=10)

#分批读取数据,保存成dataframe。示例中mysql数据表中的数据可根据id分批
dat = pd.DataFrame()
for i in range(1,len(batch_i)):  
   # print(batch_i[i-1],batch_i[i])
    sqli = 'select * from db1.test where id>%d and id<=%d' % (batch_i[i-1], batch_i[i])       #sql语句
    d = read_table(cur=cur, sql=sqli)          #读取数据
    dat = pd.concat([dat,d], axis=0)            #按行合并数据

cur.close()    #关闭游标
con.close()    #关闭连接

第二种读取数据的方式

con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8') 

sql = 'select * from db1.test'
d = pd.read_sql(sql=sql, con=con_engine, chunksize=10)    #分10批读取数据,语句返回的是生成器

dat = pd.DataFrame()
for i in d:
    dat = pd.concat([dat,  i], axis = 0)
dat.index = range(len(dat))    #上述方法输出的index会有重复,更新index

分批写入mysql

第一种方式数据写入数据库

#建立连接
con=pymysql.connect(
  host="10.***.***.***",      #ip地址
  database="db1",        #需读取的数据表所在数据库的库名
  user="user",               #mysql用户名
  password="password",    #密码
  port=3306,       #端口号
  charset='utf8'    
)

cur = con.cursor()

#写入数据
batch_insert_i = batch_generate(n=1000,b=10)
for i in range(1,len(batch_insert_i)):
    sqli = 'insert into db1.test_py(id, w_no, c_code, s_code, c_dt, l_dt)  values(%s,%s,%s,%s,%s,%s)'
    dati = (dat.iloc[batch_insert_i[i-1]:batch_insert_i[i], :]).values.tolist()
    #print(batch_insert_i[i-1], batch_insert_i[i])
    try:
        # 执行sql语句
        cur.executemany(sqli,dati)
        con.commit()     # 提交到数据库执行
    except Exception as e:
        # 如果发生错误则退出,也可以不退出,回滚con.rollback()
        print(e)
        break   #con.rollback()

cur.close()    # 关闭游标
con.close()    # 关闭数据库连接

第二种方式写入数据库

con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8') 

dat.to_sql(con=con_engine, name='test_py', if_exists='append', index=False, chunksize=10) 
#name数据表名; if_exists='append'  若不存在test_py表则新建,若存在则追加写入。

注:
第二种读取和写入方式直接调用pandas连接mysql,在读写大规模数据时效率更高。

相关文章

网友评论

      本文标题:python3 分批读写mysql

      本文链接:https://www.haomeiwen.com/subject/ephhpctx.html