美文网首页
python多线程并发threading模块

python多线程并发threading模块

作者: 安静的码农 | 来源:发表于2018-03-11 22:03 被阅读0次

    最近开发一个获取所有集群从读写实例的接口,运行接口发现返回数据时间长,达到了9秒多,实在是太慢了!那么慢在哪里呢?

    review代码,发现是因为需要读取二十多个不同的数据库查询结果,这些数据库分布在二十多台不同的物理机,代码采用for循环依次读取这些数据库,从而产生耗时严重的情况.

    模拟for循环依次读取数据库脚本如下:

    #!/usr/bin/python
    # encoding=utf-8
    
    import os
    import mysql.connector
    
    ossdb_dict = {'上海A':'xxx.xxx.xxx.xxx:3306','上海B':'xxx.xxx.xxx.xxx:3306','上海C':'xxx.xxx.xxx.xxx:3306','北京A':'xxx.xxx.xxx.xxx:3306','北京B':'xxx.xxx.xxx.xxx:3306','北美A':'xxx.xxx.xxx.xxx:3306','天津A':'xxx.xxx.xxx.xxx:3306','天津B':'xxx.xxx.xxx.xxx:3306','广州A':'xxx.xxx.xxx.xxx:3306','广州B':'xxx.xxx.xxx.xxx:3306','广州C':'xxx.xxx.xxx.xxx:3306','广州D':'xxx.xxx.xxx.xxx:3306','德国A':'xxx.xxx.xxx.xxx:3306','成都A':'xxx.xxx.xxx.xxx:3306','新加坡A':'xxx.xxx.xxx.xxx:3306','深圳A':'xxx.xxx.xxx.xxx:3306','深圳B':'xxx.xxx.xxx.xxx:3306','美国A':'xxx.xxx.xxx.xxx:3306','重庆A':'xxx.xxx.xxx.xxx:3306','香港A':'xxx.xxx.xxx.xxx:3306','韩国A':'xxx.xxx.xxx.xxx:3306','广州E':'xxx.xxx.xxx.xxx:3306'}
    
    cluster_dict = {'上海A':49,'上海B':61,'上海C':63,'北京A':204,'北京B':206,'北美A':59,'天津A':64,'天津B':70,'广州A':73,'广州B':22,'广州C':60,'广州永顺':D,'德国A':202,'成都A':201,'新加坡A':71,'深圳A':74,'深圳B':81,'美国A':79,'重庆A':205,'香港A':52,'韩国A':207,'广州E':76}
    
    slave_rw_sql = """select tb_mysql_pair.instance_name,tb_mysql_pair.app_name,tb_mysql_pair.master_port,tb_mysql_pair.slave_port,tb_device_pair.master_ip,tb_device_pair.slave_ip from tb_mysql_pair,tb_device_pair where tb_mysql_pair.status=19 and tb_mysql_pair.device_pair_id=tb_device_pair.pair_id;"""
    
    db_user  = 'xxx'
    db_pass = 'xxx'
    access_db = 'xxx'   
    
    def execDB(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql):
             
        cluster_ids  = []
        for key in cluster_dict:
            id   = cluster_dict[key] 
            cluster_ids.append(id)
    
        ossdb_list = [] 
        for cluster_id in cluster_ids:
            cluster_name = cluster_dict.keys()[cluster_dict.values().index(cluster_id)]
            ossdb  = ossdb_dict[cluster_name]
    
            ossdb_list.append(ossdb)
    
        #for循环一个一个读取数据库,是单线程的,所以会比较慢
        for ossdb in ossdb_list:
            cluster_name = ossdb_dict.keys()[ossdb_dict.values().index(ossdb)]
            db_host  = ossdb.split(':',1)[0]
            db_port   = ossdb.split(':',1)[1]
    
            conn = mysql.connector.connect(
                user  = db_user,
                password = db_pass,
                host  = db_host,
                port   = db_port,
                database = access_db)
    
            cur = conn.cursor()
            cur.execute(slave_rw_sql)
            slave_rw_results = cur.fetchall()
    
    
    if __name__=='__main__':
        execDB(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql)
    

    实际执行脚本,耗时情况:
    time python 3.py

    real 0m9.444s
    user 0m0.060s
    sys 0m0.016s

    跑完脚本需要0m9.444s,很显然耗时是在for循环依次查询数据库产生的耗时.

    是否有办法提高读取数据库的效率,消除耗时呢?答案是肯定的!

    python的标准库提供了并发执行的多线程模块thread和threading,thread是低级模块,而threading是高级模块,对thread进行了封装。在绝大多数情况下,我们只需要使用threading这个高级模块.

    在python中实现多线程有两种方式,一种就是函数形式,通过将需要执行的方法传入,然后创建多线程实例;另一种就是创建一个类,并且继承threading.Thread类来实现. 这里我只讲第一种方式,比较容易理解一些.

    注:
    什么是多线程呢?顾名思义,多线程就是在同一个进程的情况下拉起来多个线程,进程和线程之间的关系,就好比是工厂和工人之间的关系。工厂是一个,但是工人有多个,多人干活自然就可以提高生产效率。

    来看看多线程并发执行的脚本:

    #!/usr/bin/python
    # encoding=utf-8
    
    
    import os
    import threading  #引入threading多线程模块
    import mysql.connector
    
    ossdb_dict = {'上海A':'xxx.xxx.xxx.xxx:3306','上海B':'xxx.xxx.xxx.xxx:3306','上海C':'xxx.xxx.xxx.xxx:3306','北京A':'xxx.xxx.xxx.xxx:3306','北京B':'xxx.xxx.xxx.xxx:3306','北美A':'xxx.xxx.xxx.xxx:3306','天津A':'xxx.xxx.xxx.xxx:3306','天津B':'xxx.xxx.xxx.xxx:3306','广州A':'xxx.xxx.xxx.xxx:3306','广州B':'xxx.xxx.xxx.xxx:3306','广州C':'xxx.xxx.xxx.xxx:3306','广州D':'xxx.xxx.xxx.xxx:3306','德国A':'xxx.xxx.xxx.xxx:3306','成都A':'xxx.xxx.xxx.xxx:3306','新加坡A':'xxx.xxx.xxx.xxx:3306','深圳A':'xxx.xxx.xxx.xxx:3306','深圳B':'xxx.xxx.xxx.xxx:3306','美国A':'xxx.xxx.xxx.xxx:3306','重庆A':'xxx.xxx.xxx.xxx:3306','香港A':'xxx.xxx.xxx.xxx:3306','韩国A':'xxx.xxx.xxx.xxx:3306','广州E':'xxx.xxx.xxx.xxx:3306'}
    
    cluster_dict = {'上海A':49,'上海B':61,'上海C':63,'北京A':204,'北京B':206,'北美A':59,'天津A':64,'天津B':70,'广州A':73,'广州B':22,'广州C':60,'广州永顺':D,'德国A':202,'成都A':201,'新加坡A':71,'深圳A':74,'深圳B':81,'美国A':79,'重庆A':205,'香港A':52,'韩国A':207,'广州E':76}
    
    slave_rw_sql = """select tb_mysql_pair.instance_name,tb_mysql_pair.app_name,tb_mysql_pair.master_port,tb_mysql_pair.slave_port,tb_device_pair.master_ip,tb_device_pair.slave_ip from tb_mysql_pair,tb_device_pair where tb_mysql_pair.status=19 and tb_mysql_pair.device_pair_id=tb_device_pair.pair_id;"""
    
    db_user  = 'xxx'
    db_pass  = 'xxx'
    access_db  = 'xxx'   
    
    def execDB(ossdb,ossdb_dict,db_user,db_pass,access_db,slave_rw_sql):
        cluster_name = ossdb_dict.keys()[ossdb_dict.values().index(ossdb)]
        db_host   = ossdb.split(':',1)[0]
        db_port    = ossdb.split(':',1)[1]
        
        conn = mysql.connector.connect(
            user  = db_user,
            password = db_pass,
            host   = db_host,
            port    = db_port,
            database = access_db)
            
        cur = conn.cursor()
        cur.execute(slave_rw_sql)
        slave_rw_results = cur.fetchall()
    
    
    #定义多线程执行的函数multithread  
    def multithread(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql):
        cluster_ids  = []
        for key in cluster_dict:
            id  = cluster_dict[key]
            cluster_ids.append(id)
            
        ossdb_list = []
        for cluster_id in cluster_ids:
            cluster_name = cluster_dict.keys()[cluster_dict.values().index(cluster_id)]
            ossdb   = ossdb_dict[cluster_name]
            
            ossdb_list.append(ossdb)
        
        #定义一个列表threads,存储要启动多线程的实例
        threads = []
        
        #循环读取多个ossdb,也就是启动了多个线程去查询DB啦~
        for ossdb in ossdb_list:
               #target表示实际要执行读取数据库的函数,multithread函数调用execDB函数,往execDB函数传参.
            t   = threading.Thread(target=execDB,args=(ossdb,ossdb_dict,db_user,db_pass,access_db,slave_rw_sql,))  
            threads.append(t)   #把要启动多线程的实例,追加到列表threads
            
    
        #把threads列表中的实例遍历出来后,调用start()方法启动多线程,就会有多个线程并发去读取数据库
        for thr in threads:
            thr.start()
        
        for thr in threads:
            #isAlive()可以返回true或者false,用来判断此时是否还有没有执行完的线程,如果还有未执行完的线程就让主线程等待线程执行结束之后,主线程再来结束.
            if thr.isAlive():
                thr.join()
        
    if __name__=='__main__':
        multithread(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql)
    

    上述脚本遍历了两次threads列表,最后一次遍历的目的是为了查看还有没有没有执行完成的子线程,只要还有子线程是活的,没有退出,就通过join()方法强制程序不可以让主线程退出,只有等所有子线程执行完成退出后,才能让主线程退出.

    来看看采用多线程并发之后,实际执行脚本耗时:
    time python 1.py
    real 0m1.927s
    user 0m0.064s
    sys 0m0.012s

    可以看到耗时0m1.927s,效率已经提升了好几倍,这个耗时在可接受范围.

    相关文章

      网友评论

          本文标题:python多线程并发threading模块

          本文链接:https://www.haomeiwen.com/subject/ngqgfftx.html