美文网首页
多线程编程---python

多线程编程---python

作者: sssnowyue | 来源:发表于2017-08-26 14:00 被阅读125次

    参考 《Python 核心编程》(第三版)

    一、Thread模板

    缺点:
    • 不支持守护线程:当主线程退出时,所有子线程都将终止,不管它们是否在工作。
    • 同步原语少
    #-*- coding: UTF - 8 - *-
    import thread
    from time import sleep, ctime
    
    
    def loop0():
        print 'start loop0 at:', ctime()
        sleep(4)
        print 'end loop0 at:', ctime()
    
    
    def loop1():
        print 'start loop1 at:', ctime()
        sleep(2)
        print 'end loop1 at;', ctime()
    
    
    def main():
        print 'all start at:', ctime()
        thread.start_new_thread(loop0, ())#派生一个新线程
        thread.start_new_thread(loop1, ())
        sleep(6)
        print 'all end at:', ctime()
    
    if __name__ == '__main__':
        main()
    
    #-*- coding: UTF - 8 - *-
    import thread
    from time import sleep, ctime
    
    loops = [4, 2]
    
    
    def loop(nloop, nsec, lock):
        print 'start loop', nloop, 'at:', ctime()
        sleep(nsec)
        print 'end loop', nloop, 'at:', ctime()
        lock.release()  # release():释放锁
    
    
    def main():
        print 'start at:', ctime()
        locks = []
        nloops = range(len(loops))
    
        for i in nloops:
            lock = thread.allocate_lock()  # allocate_lock():分配锁对象
            lock.acquire()  # acquire():获得锁对象
            locks.append(lock)
    
        for i in nloops:
            thread.start_new_thread(loop, (i, loops[i], locks[i]))
            for i in nloops:
                while locks[i].locked():
                    pass
            print 'all end at:', ctime()
    
    if __name__ == '__main__':
        main()
    

    二、threading模板

    优点:
    • 支持守护线程:如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这个线程执行完成
    thread.daemon = True
    

    Thread类

    方案一:创建Thread实例,传给它一个函数
    #-*- coding: UTF - 8 - *-
    import threading
    from time import sleep, ctime
    loops = [4, 2]
    
    
    def loop(nloop, nsec):
        print 'start loop', nloop, 'at:', ctime()
        sleep(nsec)
        print 'end loop', nloop, 'at:', ctime()
    
    
    def main():
        print 'starting at:', ctime()
        threads = []
        nloops = range(len(loops))
    
        for i in nloops:
            t = threading.Thread(target=loop, args=(i, loops[i]))
            threads.append(t)
    
        for i in nloops:
            threads[i].start()#start():开始执行该线程
    
        for i in nloops:
            threads[i].join()#join(timeout=None):直至启动的线程终止之前一直挂起。除非给出了timeout(秒),否则一直堵塞
    
        print 'ending at:', ctime()
    
    if __name__ == '__main__':
        main()
    
    方案二:创建Thread实例,传给它一个可调用的类实例
    #-*- coding: UTF - 8 - *-
    import threading
    from time import sleep, ctime
    loops = [4, 2]
    
    
    class ThreadFunc(object):
        def __init__(self, func, args, name=''):
            self.name = name
            self.func = func
            self.args = args
    
        def __call__(self):
            self.func(*self.args)
    
    
    def loop(nloop, nsec):
        print 'start loop', nloop, 'at:', ctime()
        sleep(nsec)
        print 'end loop', nloop, 'at:', ctime()
    
    
    def main():
        print 'starting at:', ctime()
        threads = []
        nloops = range(len(loops))
    
        for i in nloops:
            t = threading.Thread(target=ThreadFunc(
                loop, (i, loops[i]), loop.__name__))
            threads.append(t)
    
        for i in nloops:
            threads[i].start()  # start():开始执行该线程
    
        for i in nloops:
            # join(timeout=None):直至启动的线程终止之前一直挂起。除非给出了timeout(秒),否则一直堵塞
            threads[i].join()
    
        print 'ending at:', ctime()
    
    
    if __name__ == '__main__':
        main()
    
    
    方案三:派生Thread的子类,并创建子类的实例
    #-*- coding: UTF - 8 - *-
    import threading
    from time import sleep, ctime
    loops = [4, 2]
    
    
    class MyThread(threading.Thread):
        def __init__(self, func, args, name=''):
            threading.Thread.__init__(self)
            self.name = name
            self.func = func
            self.args = args
    
        def run(self):  # run():定义线程功能的方法(通常在子类中被应用开发者重写)
            self.func(*self.args)
    
    
    def loop(nloop, nsec):
        print 'start loop', nloop, 'at:', ctime()
        sleep(nsec)
        print 'end loop', nloop, 'at:', ctime()
    
    
    def main():
        print 'starting at:', ctime()
        threads = []
        nloops = range(len(loops))
    
        for i in nloops:
            t = MyThread(loop, (i, loops[i]), loop.__name__)
            threads.append(t)
    
        for i in nloops:
            threads[i].start()  # start():开始执行该线程
    
        for i in nloops:
            # join(timeout=None):直至启动的线程终止之前一直挂起。除非给出了timeout(秒),否则一直堵塞
            threads[i].join()
    
        print 'ending at:', ctime()
    
    
    if __name__ == '__main__':
        main()
    
    修改上面的MyThread类(把结果保存在实例属性self.res中,并创建新方法getResult()来获取其值)>>>方便被导入
    class MyThread(threading.Thread):
        def __init__(self, func, args, name=''):
            threading.Thread.__init__(self)
            self.name = name
            self.func = func
            self.args = args
    
        def getResult(self):
            return self.res
    
        def run(self):#run():定义线程功能的方法(通常在子类中被应用开发者重写)
            self.res=self.func(*self.args)
    

    同步源语:锁/互斥、信号量

    一、锁:

    原理:

    当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码。所有之后到达的线程将被堵塞,直到第一个线程执行结束,退出临界区,并释放锁。此时其他等待的线程可以获得锁并进入临界区。(被堵塞的线程是没有顺序的)

    应用场景:

    特殊的函数、代码块不希望(或不应该)被多个线程同时执行
    • 修改数据库
    • 更新文件
    • ......

    代码:

    from threading import Lock
    lock = Lock()
    lock.acquire()#获取锁
    lock.release()#释放锁
    
    #上下文管理器
    from __future__ import with_statement
    with lock:
        ......#锁的线程块
    
    二、信号量:
    信号量是最古老的同步原语之一
    threading模块包括两种信号量类:SemaphoreBoundedSemaphore(BoundedSemaphore额外功能:计数器永远不会超过初始值,可以防范其中信号量释放次数多于获得次数的异常用例)

    原理:

    它是一个计数器,当资源消耗(acquire)时,计数器值减1;当资源释放(release)时,计数器值加1

    应用场景:

    • 线程拥有有限资源
    • ......

    代码(糖果机):

    #-*- coding: UTF - 8 - *-
    from atexit import register
    from random import randrange
    from threading import BoundedSemaphore, Lock, Thread
    from time import sleep, ctime
    
    lock = Lock()
    Max = 5
    candytray = BoundedSemaphore(Max)
    
    
    def refill():
        lock.acquire()
        print 'Refilling candy...'
        try:
            candytray.release()
        except ValueError:
            print 'full,skipping'
        else:
            print 'OK'
        lock.release()
    
    
    def buy():
        lock.acquire()
        print 'Buying candy...'
        if candytray.acquire(False):
            print 'OK'
        else:
            print 'empty,skipping'
        lock.release()
    
    
    def producer(loops):
        for i in xrange(loops):
            refill()
            sleep(randrange(3))
    
    
    def consumer(loops):
        for i in xrange(loops):
            buy()
            sleep(randrange(3))
    
    
    def _main():
        print 'starting at:', ctime()
        nloops = randrange(2, 6)
        print 'THE CANDY MACHINE (full with %d bars)!' % Max
        Thread(target=consumer, args=(randrange(nloops, nloops + Max + 2),)).start()
        Thread(target=producer, args=(nloops,)).start()
    
    
    @register
    def _atexit():
        print 'all DONE at:', ctime()
    
    
    if __name__ == '__main__':
        _main()
    



    生产者-消费者问题(Queue/queue模块)

    原理:

    创建一个队列,生产者(线程)生产时放入商品,消费者(线程)消费时取出商品

    应用场景:

    生产者-消费者及类似情景【生产时间不确定,消费时间不确定】

    代码:

    #-*- coding: UTF - 8 - *-
    from random import randint
    from time import sleep
    from Queue import Queue
    from threading3 import MyThread
    
    
    def writeQ(queue):
        print 'producing object for Q...',
        queue.put('xxx', 1)
        print "size now", queue.qsize()  # qsize():返回队列大小
    
    
    def readQ(queue):
        print 'consumed object from Q... size now', queue.qsize()
    
    
    def writer(queue, loops):
        for i in range(loops):
            writeQ(queue)
            sleep(randint(1, 3))
    
    
    def reader(queue, loops):
        for i in range(loops):
            readQ(queue)
            sleep(randint(2, 5))
    
    
    funcs = [writer, reader]
    nfuncs = range(len(funcs))
    
    
    def main():
        nloops = randint(2, 5)
        # Queue(maxsize=0):创建一个先入先出的队列,如果给出最大值,则在队列没有空间时堵塞;否则(没有指定最大值),为无限队列。
        q = Queue(32)
        threads = []
    
        for i in nfuncs:
            t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
            threads.append(t)
    
        for i in nfuncs:
            threads[i].start()
    
        for i in nfuncs:
            threads[i].join()
    
        print 'all Done'
    
    
    if __name__ == '__main__':
        main()
    
    



    concurrent.futures模块

    优点:

    • "任务"级别进行操作
    • 不需要过分关注同步和线程/进程的管理

    原理:

    指定一个给定数量的线程池/进程池------提交任务------整理结果

    代码:

    #-*- coding: UTF - 8 - *-
    from concurrent.futures import ThreadPoolExecutor#ThreadPoolExecutor-多线程,ProcessPoolExecutor-多进程
    from re import compile
    from time import ctime
    from urllib.request import urlopen as uopen
    
    REGEX = compile('#([\d,]+) in Books ')
    AMZN = 'http://amazon.com/dp/'
    ISBNS = {
        '0132269937': 'Core Python Programming',
        '0132356139': 'Python Web Development with Django',
        '0137143419': 'Python Fundamentals',
    }
    
    
    def getRanking(isbn):
        with uopen('{0}{1}'.format(AMZN, isbn)) as page:
            return str(REGEX.findall(page.read())[0],'utf-8')
    
    
    def _main():
        print ('Start at', ctime(), 'on Amazon...')
        with ThreadPoolExecutor(3) as executor:#ThreadPoolExecutor(n):n代表线程池个数
            for isbn, ranking in zip(ISBNS, executor.map(getRanking, ISBNS)):
                print ('- %r ranked - %s' % (ISBNS[isbn], ranking))
        print('all Done at:', ctime())
    
    
    if __name__ == '__main__':
        _main()
    

    实践

    1、Amazon图书排行排名

    #-*- coding: UTF - 8 - *-
    from atexit import register#atexit.register()函数:告知脚本结束时间
    from re import compile
    from threading import Thread
    from time import ctime
    from urllib2 import urlopen as uopen
    
    REGEX = compile('#([\d,]+) in Books ')
    AMZN = 'http://amazon.com/dp/'
    ISBNS = {
        '0132269937':'Core Python Programming',
        '0132356139':'Python Web Development with Django',
        '0137143419':'Python Fundamentals',
    }
    
    def getRanking(isbn):
        page = uopen('%s%s' % (AMZN,isbn))
        data = page.read()
        page.close()
        return REGEX.findall(data)[0]
    
    def _showRanking(isbn):#函数名前面的单划线--->特殊函数--->只能被本模块的代码使用,不能被其他使用本文件作为库或者工具模块的应用导入
        print '- %r ranked %s' %(ISBNS[isbn],getRanking(isbn))
    
    def _main():
        print 'At',ctime(),'on Amazon......'
        for isbn in ISBNS:
            #单线程
            # _showRanking(isbn)
            #多线程
            Thread(target=_showRanking,args=(isbn,)).start()
    
    @register
    def _atexit():
        print 'all DONE at:',ctime()
    
    if __name__=='__main__':
        _main()
    

    相关文章

      网友评论

          本文标题:多线程编程---python

          本文链接:https://www.haomeiwen.com/subject/awwmdxtx.html