前言: 最近做内部运维数据的数据仓库,最终将Hive中的数据清洗后需要业务决策相关的数据进行结构化处理,并存储到关系型数据库MySQL中,以供后期对外接口使用。本篇简单记录下使用Python操作MySQL数据库的简单操作。
MySQL数据库环境准备
注意:在当前容器化基础设施已经全面覆盖的时代,为了快速验证效果,我们及其推荐使用以Docker为代表的容器化基础设施来快速构建你的基础环境。
在DockerHub上有丰富的基础中间件的镜像,我们可以使用Docker快速的构建我们的MySQL基础环境,而不必每次重新安装各种复杂的中间件环境,因为我们只是使用者,我相信每个团队都会有专门的中间件维护者。好吧,如果没有,那你依然可以自己根据实际的需求和标准进行构建Docker镜像,这样就为我们创造了一个未来很长一段时间可复用的组件。总之,想说的一件事就是,下面的MySQL环境是用Docker容器跑的。
# 确保docker环境正常
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
# 下载MySQL指定版本的镜像
$ docker pull mysql:5.6
$ docker images | grep mysql
mysql 5.6 d1f491b20727 2 days ago 256 MB
# 创建一个mysql实例[需要指定至少一个环境变量:MYSQL_ROOT_PASSWORD, MYSQL_ALLOW_EMPTY_PASSWORD and MYSQL_RANDOM_ROOT_PASSWORD]
$ docker run -itd --name mysql -e MYSQL_ROOT_PASSWORD="123456" -P mysql:5.6
6c4428b341516c7eeec48cbc5b658a464f76b5f7d42b3e689151392f5cd8ac56
# MySQL密码为123456,端口为32773
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6c4428b34151 mysql:5.6 "docker-entrypoint.sh" 5 seconds ago Up 3 seconds 0.0.0.0:32773->3306/tcp mysql
# mysql数据库登录测试
$ mysql -h 127.0.0.1 -uroot -P 32773 -p123456
...
Server version: 5.6.42 MySQL Community Server (GPL)
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
...
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
+--------------------+
3 rows in set (0.00 sec)
# 查看数据库字符编码格式
mysql> show variables like '%char%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | utf8 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | utf8 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
8 rows in set (0.00 sec)
至此,一个MySQL数据库已经准备好,需要我们注意的是,因为使用的是官方的Docker image,我们需要进行相关配置的检查和设置,否则可能会为后期的操作造成一定麻烦,比如设置数据库的字符编码.
让MySQL支持中文,一般而言需要关注以下几个点:
- 修改MySQL配置中客户端和服务端的字符编码为
utf8
,分别为[mysqld的default-character-set和character-set-server参数以及client的default-character-set参数]
- 修改MySQL配置中客户端和服务端的字符编码为
- 创建表时指定表的字符编码(default charset=utf8;)
- 链接数据库的时候指定链接字符编码(charset=utf8)
- 使用Python操作数据库时需要对Python文件进行utf8支持(#encoding=utf-8和sys.setdefaultencoding(utf-8))
- 使用
show variables like '%char%';
命令检查mysql字符集是否为utf8格式
,并使用SET NAMES UTF8; 或者set character_set_server = utf8;
进行设置
- 使用
使用Python进行操作MySQL
首先,在使用之前我们需要对Python版的MySQL库有一个了解,当前主流的库有MySQLdb
,PyMySQL
和SQLAlchemy
.
-
MySQLdb
:一般是Linux系统发行版中默认支持的,通常包名为Python-MySQL
,核心由C语言打造,接口精炼,性能最棒,缺点是环境依赖较多,安装复杂,近两年已停止更新,只支持Python2,不支持Python3 -
PyMySQL
:纯python打造,接口与Python-MySQL兼容,安装方便,支持Python3 -
SQLAlchemy
: 一个ORM框架,它并不提供底层的数据库操作,而是要借助于MySQLdb、PyMySQL等第三方库来完成,目前SQLAlchemy在Web编程领域应用广泛
备注:其实还有类似mysqlclient之类的库,主要集成在一些web框架中作为依赖
由于为了快速实现业务逻辑,在接下来的操作中主要使用PyMySQL
库进行操作数据库,虽然性能不及MySQLdb
,但是可以使用pymysql.install_as_MySQLdb()
来兼容MySQLdb
,在业务正式上线时可以不改变业务代码逻辑而平滑的使用MySQLdb
库。
安装pymysql库
在Linux
环境下,大多数系统工具使用Python语言进行编写,因此在安装额外的Python模块时,通常会有几种选择:
- 使用系统自带工具安装
apt-get install or yum install
,该种方式会将模块默认安装的系统环境,可能会影响系统环境
- 使用系统自带工具安装
- 使用Python原声的包管理工具
pip install
,该种方式会默认安装到pip
命令所在的Python解释环境下,因此取决于Python环境是否独立于系统环境的Python,通常情况下会使用pyenv
之类的工具进行环境隔离
- 使用Python原声的包管理工具
- 使用包管理工具
conda
相关工具进行管理python,可以有效管理python多环境依赖,并且可以很方便构建数据科学相关环境.conda使用指南
- 使用包管理工具
# 安装pymysql库
$ pip install pymysql
or
$ conda install pymysql
python链接MySQL以及基本使用
使用pymysql库操作mysql
$ cat pytest_mysql.py
import pymysql
# 获取一个mysql链接对象
conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset='utf8')
# 使用数据库链接的cursor()方法获取一个游标对象
cursor = conn.cursor()
# 使用游标对象的execute()方法进行执行sql语句
cursor.execute("SELECT VERSION()")
## execute方法的定义如下,其中args可以是tuple, list or dict,如果是list or tuple的话,%s会被当做查询的一个占位符;如果是dict的话%(name)s会被当做一个占位符
## execute(self, query, args=None)
# 使用游标对象的fetch类方法获取数据
## fetchone返回一条数据,fetchall返回查询的所有数据。fetch类方法会返回一个list类型的tuple结构类型对象.[(),()...]
onedata = cursor.fetchone()
alldata = cursor.fetchall()
# 提交数据库操作[一般在更新数据库操作时需要注意执行]
conn.commit()
# 及时关闭数据库链接以及打开的游标[以防止在并发情况下系统打开连接数过多]
cursor.close()
conn.close()
尝试用python脚本进行数据库操作
$ cat test_show_table.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import pymysql
class mysqlObj:
def __init__(self,host,dbname,user,passwd,port=3306):
self.host = host
self.dbname = dbname
self.user = user
self.passwd = passwd
self.port = port
def mysqlConIns(self):
conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.dbname, charset='utf8')
return conn
def querydata(self,sql,args=None):
conn = self.mysqlConIns()
cur = conn.cursor()
cur.execute(sql,args)
alldata = cur.fetchall()
cur.close()
for data in alldata:
print(data)
conn.close()
if __name__ == '__main__':
mysqlobj = mysqlObj('localhost','mysql','root','123456',32773)
mysqlobj.querydata("show tables;")
# 对mysql库进行查看tables操作,返回的是一个tuple
$ python test_show_table.py
(u'columns_priv',)
(u'db',)
(u'event',)
(u'func',)
(u'general_log',)
(u'help_category',)
(u'help_keyword',)
(u'help_relation',)
(u'help_topic',)
(u'innodb_index_stats',)
(u'innodb_table_stats',)
(u'ndb_binlog_index',)
(u'plugin',)
(u'proc',)
(u'procs_priv',)
(u'proxies_priv',)
(u'servers',)
(u'slave_master_info',)
(u'slave_relay_log_info',)
(u'slave_worker_info',)
(u'slow_log',)
(u'tables_priv',)
(u'time_zone',)
(u'time_zone_leap_second',)
(u'time_zone_name',)
(u'time_zone_transition',)
(u'time_zone_transition_type',)
(u'user',)
MySQL数据库常用的一些操作
注意:在之前我们创建的MySQL实例中仅是一个空的数据库,在实际使用之前,我们需要进行数据库的库表结构创建,以及相关的数据库授权,而这一部分操作通常会由专业的数据库管理员(DBA)进行操作和处理
接下来对一个website
数据库和use
表进行操作:
mysql> create database website;
Query OK, 1 row affected (0.00 sec)
mysql> CREATE TABLE IF NOT EXISTS useinfo (userid int(10) primary key not null auto_increment,username varchar(20) not null,usersite varchar(50),other varchar(50)) DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.02 sec)
mysql> describe useinfo;
+----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+----------------+
| userid | int(10) | NO | PRI | NULL | auto_increment |
| username | varchar(20) | NO | | NULL | |
| usersite | varchar(50) | YES | | NULL | |
| other | varchar(50) | YES | | NULL | |
+----------+-------------+------+-----+---------+----------------+
4 rows in set (0.00 sec)
# 数据库授权[授权所有的主机可以以root用户,123456的密码去操作website库]
mysql> grant all on website.* to root@'%' identified by "123456";
Query OK, 0 rows affected (0.00 sec)
对指定数据库进行相关查询操作
# 依然是上面那个test_show_table.py脚本,不过我们改变一下`__main__`
$ cat test_show_table.py
.....
if __name__ == '__main__':
mysqlobj = mysqlObj('localhost','website','root','123456',32773)
args = ["show tables;","describe useinfo;"]
for arg in args:
mysqlobj.querydata(arg)
# 可以看到我们的website库下有useinfo一张表,并且该表包含userid,username,usersite,other4个字段
$ python test_show_table.py
(u'useinfo',)
(u'userid', u'int(10)', u'NO', u'PRI', None, u'auto_increment')
(u'username', u'varchar(20)', u'NO', u'', None, u'')
(u'usersite', u'varchar(20)', u'YES', u'', None, u'')
(u'other', u'varchar(20)', u'YES', u'', None, u'')
对MySQL数据库进行插入操作
注意:插入操作分为单条记录插入和批量插入,一般数据库都支持批量插入方法,在pysql中为cursor.executemany(sql,args)
为我们的mysqlObj类增加一个插入操作:
$ cat test_show_table.py
....
....
#注意插入数据时单条记录使用tuple()类型;批量插入数据时使用list()类型
def changedata(self,sql,args=None):
conn = self.mysqlConIns()
cur = conn.cursor()
try:
#做一个粗暴的判断当args是list时就进行批量插入
if isinstance(args,list):
#executemany(sql,args)方法args支持tuple或者list类型
cur.executemany(sql,args)
else:
#execute(sql,args)方法args支持string,tuple,list,dict
cur.execute(sql,args)
conn.commit()
except Exception as e:
conn.rollback()
print(e)
finally:
cur.close()
conn.close()
....
...
if __name__ == '__main__':
mysqlobj = mysqlObj('localhost','website','root','123456',32773)
'''
args = ["show tables;","describe useinfo;"]
for arg in args:
mysqlobj.querydata(arg)
'''
#插入一条数据
sql = "insert into useinfo (username) values(%s)"
arg = "彪哥"
mysqlobj.changedata(sql,arg)
sql1 = "insert into useinfo (username,usersite) values(%s,%s)"
arg1 = ("xxbandy","http://xxbandy.github.io")
mysqlobj.changedata(sql1,arg1)
#批量插入数据
argslist = [("彪哥","http://xxbandy.github.io"),("bgbiao","https://www.jianshu.com/u/9c46ece5b7bd")]
mysqlobj.changedata(sql1,argslist)
#查询数据
mysqlobj.querydata("select * from useinfo")
print("updating the data")
#更新数据[需要注意的是指定了字段之后由于usersite是varchar类型,占位符必须是"%s",如果是'%s'会有问题]
data = "https://my.oschina.net/xxbAndy"
mysqlobj.changedata('update useinfo set usersite=%s where userid = 1',data)
mysqlobj.querydata("select * from useinfo")
# 插入数据并查看数据
$ python /tmp/abc.py
(1, u'\u5f6a\u54e5', None, None)
(2, u'xxbandy', u'http://xxbandy.github.io', None)
(3, u'\u5f6a\u54e5', u'http://xxbandy.github.io', None)
(4, u'bgbiao', u'https://www.jianshu.com/u/9c46ece5b7bd', None)
updating the data
(1, u'\u5f6a\u54e5', u'https://my.oschina.net/xxbAndy', None)
(2, u'xxbandy', u'http://xxbandy.github.io', None)
(3, u'\u5f6a\u54e5', u'http://xxbandy.github.io', None)
(4, u'bgbiao', u'https://www.jianshu.com/u/9c46ece5b7bd', None)
# 数据库查询记录
mysql> select * from website.useinfo;
+--------+----------+----------------------------------------+-------+
| userid | username | usersite | other |
+--------+----------+----------------------------------------+-------+
| 1 | 彪哥 | https://my.oschina.net/xxbAndy | NULL |
| 2 | xxbandy | http://xxbandy.github.io | NULL |
| 3 | 彪哥 | http://xxbandy.github.io | NULL |
| 4 | bgbiao | https://www.jianshu.com/u/9c46ece5b7bd | NULL |
+--------+----------+----------------------------------------+-------+
源码
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import pymysql
class mysqlObj:
def __init__(self,host,dbname,user,passwd,port=3306):
self.host = host
self.dbname = dbname
self.user = user
self.passwd = passwd
self.port = port
def mysqlConIns(self):
conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.dbname, charset='utf8')
return conn
def querydata(self,sql,args=None):
conn = self.mysqlConIns()
cur = conn.cursor()
cur.execute(sql,args)
alldata = cur.fetchall()
cur.close()
for data in alldata:
print(data)
conn.close()
#注意插入数据时单条记录使用tuple()类型;批量插入数据时使用list()类型
def changedata(self,sql,args=None):
conn = self.mysqlConIns()
cur = conn.cursor()
try:
#做一个粗暴的判断当args是list时就进行批量插入
if isinstance(args,list):
#executemany(sql,args)方法args支持tuple或者list类型
cur.executemany(sql,args)
else:
#execute(sql,args)方法args支持string,tuple,list,dict
cur.execute(sql,args)
conn.commit()
except Exception as e:
conn.rollback()
print(e)
finally:
cur.close()
conn.close()
if __name__ == '__main__':
mysqlobj = mysqlObj('localhost','website','root','123456',32773)
'''
args = ["show tables;","describe useinfo;"]
for arg in args:
mysqlobj.querydata(arg)
'''
#插入一条数据
sql = "insert into useinfo (username) values(%s)"
arg = "彪哥"
mysqlobj.changedata(sql,arg)
sql1 = "insert into useinfo (username,usersite) values(%s,%s)"
arg1 = ("xxbandy","http://xxbandy.github.io")
mysqlobj.changedata(sql1,arg1)
#批量插入数据
argslist = [("彪哥","http://xxbandy.github.io"),("bgbiao","https://www.jianshu.com/u/9c46ece5b7bd")]
mysqlobj.changedata(sql1,argslist)
#查询数据
mysqlobj.querydata("select * from useinfo")
print("updating the data")
#更新数据
data = "https://my.oschina.net/xxbAndy"
mysqlobj.changedata('''update useinfo set usersite=%s where userid = 1''',data)
mysqlobj.querydata("select * from useinfo")
网友评论