美文网首页
Python 3中的多线程

Python 3中的多线程

作者: 小温侯 | 来源:发表于2018-07-20 23:53 被阅读10次

    Python 3的多线程

    Python 3的多线程模块threading在旧版_thread模块基础上进行了更高层面的封装。

    Thread-Local数据

    Thread-Local数据是某线程的本地数据。你只需要新建一个local类的实例就可以了。

    mydata = threading.local()
    mydata.x = 1
    

    线程对象

    和multprocessing非常类似,如果你要新建一个Thread类用来代表某线程下的要执行的活动。想要在Thraed类添加你要的活动(通常是一个函数)有两种方法:一个是传入一个可被调用的对象(比如函数)到类的构造器中;另一种是覆写Thread类的run()__init__()方法。

    一旦某个线程的活动启动,线程就认为是活的(alive),直到run()方法结束,不管是正常结束还是异常结束的。你可以使用is_alive()方法来测试某个线程是否存活。

    一条线程可以调用其它线程的join()方法,之后这条线程会被阻塞直到其他线程结束。

    线程可以有名字,你可以把名字传入构造器,可以通过访问或修改name属性对线程名字进行操作。

    一条线程也可以被标识为“守护进程”,如果只剩守护进程了,Python的主程序就会退出,守护进程也会被终结。

    另外,有时可能会有“alien thread”的存在,这些线程有可能不是由threading模块船舰的,比如说直接由C代码创建。这些线程功能有限,但是是处于存活状态和守护进程状态的,并且不能被join()。最糟糕的,Python不能检测到这些线程。

    • threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
      • group为None,是给将来实现ThreadGroup预留的。
      • target,name,args,kwargs,daemon都和多进程类似。
      • 可以调用的方法有:start(), run(), join(), name, getName(), setName(), ident, is_alive, daemon, isDaemon(), setDaemon()。

    Global Interpreter Lock

    在CPython的实现中,因为GIL的存在,同一时间只能有一条线程执行Python代码(即使有些库可能能够避免这个限制)。如果你的程序是多核计算密集型的,最好使用multiprocessingconcurrent.futures.ProcessPoolExecutor;如果是I/O密集型的程序,则可以使用threading库。这一点很重要,可以作为判断使用多进程还是多线程的依据。

    锁对象和递归锁对象

    两种锁的概念和多进程中两种锁的概念完全一样,连类名字Lock()和RLock(),方法acquire(blocking=True, timeout=-1)和release()都一样。就不多做解释了。

    Condition对象(状态对象?)

    直接上例子:

    import threading
    import time
    import logging
    
    logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s',)
    
    def consumer(cv):
        logging.debug('Consumer thread started ...')
        with cv:
            logging.debug('Consumer waiting ...')
            cv.wait()
        logging.debug('Consumer consumed the resource')
    
    def producer(cv):
        logging.debug('Producer thread started ...')
        with cv:
            logging.debug('Making resource available')
            logging.debug('Notifying to all consumers')
            cv.notifyAll()
    
    if __name__ == '__main__':
        condition = threading.Condition()
        cs1 = threading.Thread(name='consumer1', target=consumer, args=(condition,))
        cs2 = threading.Thread(name='consumer2', target=consumer, args=(condition,))
        pd = threading.Thread(name='producer', target=producer, args=(condition,))
    
        cs1.start()
        time.sleep(2)
        cs2.start()
        time.sleep(2)
        pd.start()
    

    注意这个例子里没有使用acquire()和release()方法用来申请和释放锁,这是因为 context management protocol 的存在:它允许程序使用with语句来操作相对应的锁,在本例中,因为cv是Conditon对象,所以with语句会自动获得Condition类型的锁,锁的持续时间和代码块一致。当然你也可以使用acquire()和release()方法。wait()方法会释放锁,然后进入阻塞状态直到被另一个线程用notify()或notify_all()唤醒。

    还有一个wait_for(predicate, timeout=None), predicate是一个可被调用的对象,因此这个方法就相当于:

    while not predicate():
        cv.wait()
    

    信号量对象

    信号量是计算机历史上最古老的用来结局同步问题的名词。一个信号量其实就是一个计数器,如果acquire()调用,计数器减1;如果release()调用,计数器加1。这个计数器的值不能低于0,如果调用acquire()的时候发现计数器的值是0,线程阻塞,直到这个计数器重新增加。一般这个计数器的值对应了资源的数量。

    锁其实是信号量的一种特殊情况,即这个计数器最大值为1。

    例子:

    maxconnections = 5
    # ...
    pool_sema = BoundedSemaphore(value=maxconnections)
    with pool_sema:
        conn = connectdb()
        try:
            # ... use connection ...
        finally:
            conn.close()
    

    它也支持context management protocol

    事件对象

    这是一种最简单的线程交流机制:一个线程发出一个事件信号,其他线程等待这个信号。

    一个event对象会包含一个内部的标识,调用set()方法可以将这个标识设为True;clear()可以将其设为False。wait()方法会阻塞当前线程直到这个标识变为True。is_set()可以判断这个标识,它当且仅当标识为True的时候返回True。

    计时器对象

    Timer对象通过使用计时器可以实现延迟执行某个方法,在计时器结束之前,可以调用cancel()方法终止它。

    def hello():
        print("hello, world")
    
    t = Timer(30.0, hello)
    t.start()  # after 30 seconds, "hello, world" will be printed
    

    Barrier对象(屏障对象?)

    我不确定这个barrier应该怎么翻译,从文档的解释来看,它类似于一个接力棒。它可以用来批量处理需要互相等待的线程。在这个固定数量几个线程中,每一个线程都会通过调用wait()方法来传递barrier,之后会进入阻塞直到所有的线程都调用了wait()。然后,所有的线程会一起释放。

    这个barrier可以在相同数量的线程上反复使用任意次。

    b = Barrier(2, timeout=5)
    
    def server():
        start_server()
        b.wait()
        while True:
            connection = accept_connection()
            process_server_connection(connection)
    
    def client():
        b.wait()
        while True:
            connection = make_connection()
            process_client_connection(connection)
    

    除了wait()方法,还可以使用reset()重置barrier的状态,但是相关的线程都会收到BrokenBarrierError异常。Abort()可以让barrier进入broken状态,这会让其他正在调用或即将调用wait()方法的线程都进入BrokenBarrierError异常。parties变量表示传递barrier的线程数量;n_waiting表示当前有多少线程正在等待这个barrier;broken是一个布尔值,标识一个barrier是否处于broken状态。

    计算密集型 vs. IO密集型

    此处来源进程 vs. 线程

    是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型和IO密集型。

    计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

    计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

    第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

    IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

    相关文章

      网友评论

          本文标题:Python 3中的多线程

          本文链接:https://www.haomeiwen.com/subject/jjecmftx.html