美文网首页
python | pymysql模块

python | pymysql模块

作者: Root_123 | 来源:发表于2019-03-01 16:56 被阅读12次

    1、基本增删改查操作
    2、python调用存储过程
    3、多线程实现mysql存取操作

    一、基础操作

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pymysql 
    
    # 连接数据库
    #conn = pymysql.connect('localhost', 'root', 'root')
    
    # 也可以使用关键字参数
    #conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root', db='', charset='utf8')
    
    # 也可以使用字典进行连接参数的管理
    config = {
        'host': '127.0.0.1',
        'port': 3306,
        'user': 'root',
        'passwd': '',
        'charset': 'utf8'
    }
    conn = pymysql.connect(**config)
    
    # 如果使用事务引擎,可以设置自动提交事务,或者在每次操作完成后手动提交事务conn.commit()
    #conn.autocommit(1)    # conn.autocommit(True) 
    
    # 使用cursor()方法获取操作游标
    cursor = conn.cursor()
    # 因该模块底层其实是调用CAPI的,所以,需要先得到当前指向数据库的指针。
    
    try:
        # 创建数据库
        DB_NAME = 'test'
        cursor.execute('DROP DATABASE IF EXISTS %s' %DB_NAME)
        cursor.execute('CREATE DATABASE IF NOT EXISTS %s' %DB_NAME)
        conn.select_db(DB_NAME)
    
         #创建表
        TABLE_NAME = 'user'
        cursor.execute('CREATE TABLE %s(id int primary key,name varchar(30))' %TABLE_NAME)
    
        # 插入单条数据
        sql = 'INSERT INTO user values("%d","%s")' %(1,"jack")
        
        # 批量插入数据
        values = []
        for i in range(3, 20):
            values.append((i,'kk'+str(i)))
        cursor.executemany('INSERT INTO user values(%s,%s)',values)
       
       # 查询数据条目
        count = cursor.execute('SELECT * FROM %s' %TABLE_NAME)
        print('total records: %d' %count)
        print('total records:', cursor.rowcount)
    
        # 获取表名信息
        desc = cursor.description
        print("%s %3s" % (desc[0][0], desc[1][0]))
    
        # 查询一条记录
        print 'fetch one record:'
        result = cursor.fetchone()
        print result
        print ('id: %s,name: %s' %(result[0],result[1]))
    
        # 查询多条记录
        print 'fetch five record:'
        results = cursor.fetchmany(5)
        for r in results:
            print (r)
    
        print('===============')
    
         # 查询所有记录
        # 重置游标位置,偏移量:大于0向后移动;小于0向前移动,mode默认是relative
        # relative:表示从当前所在的行开始移动; absolute:表示从第一行开始移动
        cursor.scroll(0,mode='absolute')
        results = cursor.fetchall()
        for r in results:
            print r
        print('--------------')
        cursor.scroll(-2)
        results = cursor.fetchall()
        for r in results:
            print r
    
        # 更新记录
        cursor.execute('UPDATE %s SET name = "%s" WHERE id = %s' %(TABLE_NAME,'Jack',1))
    
    
    
        # 如果没有设置自动提交事务,则这里需要手动提交一次
        conn.commit()
    except:
        import traceback
        traceback.print_exc()
        # 发生错误时会滚
        conn.rollback()
    finally:
        # 关闭游标连接
        cursor.close()
        # 关闭数据库连接
        conn.close()
    
    

    每次都连接关闭很麻烦,使用上下文管理,简化连接过程

    import pymysql
    import contextlib
    #定义上下文管理器,连接后自动关闭连接
    @contextlib.contextmanager
    def mysql(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1',charset='utf8'):
      conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
      cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
      try:
        yield cursor
      finally:
        conn.commit()
        cursor.close()
        conn.close()
     
    # 执行sql
    with mysql() as cursor:
      print(cursor)
      row_count = cursor.execute("select * from tb7")
      row_1 = cursor.fetchone()
      print row_count, row_1
    

    二、如何python调用 callproc 进行调用储存过程

    1.创建完整的Mysql数据库连接
    2.使用cursor()初始化数据库游标
    3.使用游标来调用callproc函数 里面添加 需要传入的变量 例如 callproc(name,args) name="proc_user",args=['21',syh];
    4.cursor可以传递出一系列的结果集,使用storeresult来获取一系列的iterator指向结果集
    5.用fetchall方法获取结果

    callproc 无法直接获得out和INOUT变量 ,但是变量存在server中,可以通过@_procname_n 来获取变量值,可以按照传入参数的位置获取,如第1个 SELECT @_procname_0

    调用无参存储过程

    #! /usr/bin/env python
    # -*- coding:utf-8 -*-
    # __author__ = "TKQ"
    
    import pymysql
    
    conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
    #游标设置为字典类型
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    #无参数存储过程
    cursor.callproc('p2')    #等价于cursor.execute("call p2()")
    
    row_1 = cursor.fetchone()
    print row_1
    
    conn.commit()
    cursor.close()
    conn.close()
    

    调用有参存储过程

    #! /usr/bin/env python
    # -*- coding:utf-8 -*-
    # __author__ = "TKQ"
    
    import pymysql
    
    conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    
    cursor.callproc('p1', args=(1, 22, 3, 4))
    #获取执行完存储的参数,参数@开头
    cursor.execute("select @p1,@_p1_1,@_p1_2,@_p1_3")   #{u'@_p1_1': 22, u'@p1': None, u'@_p1_2': 103, u'@_p1_3': 24}
    row_1 = cursor.fetchone()
    print row_1
    
    
    conn.commit()
    cursor.close()
    conn.close()
    

    三、多线程、多进程操作mysql

    1、多线程连接:

    起初使用threading.Thread模块,先建立一个MySQL连接,然后由多个线程来执行具体的SQL。但发现在执行的时候,不是报MySQL连接被关闭,就是出现其他异常错误。上网查询,是因为多个线程无法共享一个数据库连接,会出现不可预测的情况。

    官方建议使用连接池模块,参照了他人的做法,创建线程连接池,一次性创建多个连接。

    2、创建线程池的三种方法
    1)过去:
    使用threadpool模块,这是个python的第三方模块,支持python2和python3。threadpool是一个比较老的模块了,现在虽然还有一些人在用,但已经不再是主流了,关于python多线程,现在已经开始步入未来(future模块)了具体使用方式如下:

    import threadpool
    import time
    
    def sayhello (a):
        print("hello: "+a)
        time.sleep(2)
    
    def main():
        global result
        seed=["a","b","c"]
        start=time.time()
        task_pool=threadpool.ThreadPool(5)
        requests=threadpool.makeRequests(sayhello,seed)
        for req in requests:
            task_pool.putRequest(req)
        task_pool.wait()
        end=time.time()
        time_m = end-start
        print("time: "+str(time_m))
        start1=time.time()
        for each in seed:
            sayhello(each)
        end1=time.time()
        print("time1: "+str(end1-start1))
    
    if __name__ == '__main__':
        main()
    

    运行结果如下:


    2)未来:
    使用concurrent.futures模块,这个模块是python3中自带的模块,但是,python2.7以上版本也可以安装使用,具体使用方式如下:

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def sayhello(a):
        print("hello: "+a)
        time.sleep(2)
    
    def main():
        seed=["a","b","c"]
        start1=time.time()
        for each in seed:
            sayhello(each)
        end1=time.time()
        print("time1: "+str(end1-start1))
        start2=time.time()
        with ThreadPoolExecutor(3) as executor:
            for each in seed:
                executor.submit(sayhello,each)
        end2=time.time()
        print("time2: "+str(end2-start2))
        start3=time.time()
        with ThreadPoolExecutor(3) as executor1:
            executor1.map(sayhello,seed)
        end3=time.time()
        print("time3: "+str(end3-start3))
    
    if __name__ == '__main__':
        main()
    

    运行结果如下:


    【注意】
    concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:

    • map可以保证输出的顺序, submit输出的顺序是乱的
    • 如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
    • submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。

    3)现在?
    这里要考虑一个问题,以上两种线程池的实现都是封装好的,任务只能在线程池初始化的时候添加一次,那么,假设我现在有这样一个需求,需要在线程池运行时,再往里面添加新的任务(注意,是新任务,不是新线程),那么要怎么办?
    其实有两种方式:

    • 重写threadpool或者future的函数:这个方法需要阅读源模块的源码,必须搞清楚源模块线程池的实现机制才能正确的根据自己的需要重写其中的方法。

    • 自己构建一个线程池:这个方法就需要对线程池的有一个清晰的了解了,附上构建的一个线程池实例:

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import threading
    import Queue
    import hashlib
    import logging
    from utils.progress import PrintProgress
    from utils.save import SaveToSqlite
    
    
    class ThreadPool(object):
        def __init__(self, thread_num, args):
    
            self.args = args
            self.work_queue = Queue.Queue()
            self.save_queue = Queue.Queue()
            self.threads = []
            self.running = 0
            self.failure = 0
            self.success = 0
            self.tasks = {}
            self.thread_name = threading.current_thread().getName()
            self.__init_thread_pool(thread_num)
    
        # 线程池初始化
        def __init_thread_pool(self, thread_num):
            # 下载线程
            for i in range(thread_num):
                self.threads.append(WorkThread(self))
            # 打印进度信息线程
            self.threads.append(PrintProgress(self))
            # 保存线程
            self.threads.append(SaveToSqlite(self, self.args.dbfile))
    
        # 添加下载任务
        def add_task(self, func, url, deep):
            # 记录任务,判断是否已经下载过
            url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()
            if not url_hash in self.tasks:
                self.tasks[url_hash] = url
                self.work_queue.put((func, url, deep))
                logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))
    
        # 获取下载任务
        def get_task(self):
            # 从队列里取元素,如果block=True,则一直阻塞到有可用元素为止。
            task = self.work_queue.get(block=False)
    
            return task
    
        def task_done(self):
            # 表示队列中的某个元素已经执行完毕。
            self.work_queue.task_done()
    
        # 开始任务
        def start_task(self):
            for item in self.threads:
                item.start()
    
            logging.debug("Work start")
    
        def increase_success(self):
            self.success += 1
    
        def increase_failure(self):
            self.failure += 1
    
        def increase_running(self):
            self.running += 1
    
        def decrease_running(self):
            self.running -= 1
    
        def get_running(self):
            return self.running
    
        # 打印执行信息
        def get_progress_info(self):
            progress_info = {}
            progress_info['work_queue_number'] = self.work_queue.qsize()
            progress_info['tasks_number'] = len(self.tasks)
            progress_info['save_queue_number'] = self.save_queue.qsize()
            progress_info['success'] = self.success
            progress_info['failure'] = self.failure
    
            return progress_info
    
        def add_save_task(self, url, html):
            self.save_queue.put((url, html))
    
        def get_save_task(self):
            save_task = self.save_queue.get(block=False)
    
            return save_task
    
        def wait_all_complete(self):
            for item in self.threads:
                if item.isAlive():
                    # join函数的意义,只有当前执行join函数的线程结束,程序才能接着执行下去
                    item.join()
    
    # WorkThread 继承自threading.Thread
    class WorkThread(threading.Thread):
        # 这里的thread_pool就是上面的ThreadPool类
        def __init__(self, thread_pool):
            threading.Thread.__init__(self)
            self.thread_pool = thread_pool
    
        #定义线程功能方法,即,当thread_1,...,thread_n,调用start()之后,执行的操作。
        def run(self):
            print (threading.current_thread().getName())
            while True:
                try:
                    # get_task()获取从工作队列里获取当前正在下载的线程,格式为func,url,deep
                    do, url, deep = self.thread_pool.get_task()
                    self.thread_pool.increase_running()
    
                    # 判断deep,是否获取新的链接
                    flag_get_new_link = True
                    if deep >= self.thread_pool.args.deep:
                        flag_get_new_link = False
    
                    # 此处do为工作队列传过来的func,返回值为一个页面内容和这个页面上所有的新链接
                    html, new_link = do(url, self.thread_pool.args, flag_get_new_link)
    
                    if html == '':
                        self.thread_pool.increase_failure()
                    else:
                        self.thread_pool.increase_success()
                        # html添加到待保存队列
                        self.thread_pool.add_save_task(url, html)
    
                    # 添加新任务,即,将新页面上的不重复的链接加入工作队列。
                    if new_link:
                        for url in new_link:
                            self.thread_pool.add_task(do, url, deep + 1)
    
                    self.thread_pool.decrease_running()
                    # self.thread_pool.task_done()
                except Queue.Empty:
                    if self.thread_pool.get_running() <= 0:
                        break
                except Exception, e:
                    self.thread_pool.decrease_running()
                    # print str(e)
                    break
    

    3、多进程连接
    多进程(multiprocessing模块),具体使用看这里multiprocessing模块的使用
    可以和MySQL建立多个连接并发执行SQL。对比执行耗时,整体性能比单个进程快,但其中单个SQL的执行效率,多进程没有单进程执行的快。
    demo:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import MySQLdb
    from multiprocessing import Process,Pool
    
    class mysqlopr():
        '''省略'''
    
    pool = Pool(5)      ####设置进程数
    for i in range(10):
        pool.apply_async(func=run_sql_func, args=(i,))      ####异步执行
        #pool.apply(func=run_sql_func, args=(arg,))                 ####同步执行,官方不建议使用,python3.+版本已无该方法
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
    

    相关文章

      网友评论

          本文标题:python | pymysql模块

          本文链接:https://www.haomeiwen.com/subject/sexjuqtx.html