40.Python编程:多线程

作者: TensorFlow开发者 | 来源:发表于2018-08-12 00:47 被阅读9次

前言

多任务可以由多进程完成,也可以由一个进程内的多线程完成。我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。

由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix Thread,而不是模拟出来的线程。

Python3 线程中常用的两个模块为:

  • _thread
  • threading(推荐使用)

thread 模块已被废弃。用户可以使用 threading 模块代替。所以,在 Python3 中不能再使用thread 模块。为了兼容性,Python3 将 thread 重命名为 _thread

_thread 与 threading

_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于threading 模块的功能还是比较有限的。threading 模块除了包含_thread模块中的所有方法外,还提供的其他方法:

threading.currentThread(): 返回当前的线程变量。

threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:

run(): 用以表示线程活动的方法。

start():启动线程活动。

join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。

isAlive(): 返回线程是否活动的。

getName(): 返回线程名。

setName(): 设置线程名。

多线程

启动一个线程说白了就是把一个函数传入并创建Thread实例,然后调用start()开始执行。

import threading, random
from time import sleep


def task():
    print("Thread 【{}】 is running...".format(threading.currentThread().getName()))
    n = 0
    while n < 5:
        print("Thread 【{}】 is performing task...{}".format(threading.currentThread().getName(), n))
        n=n+1
        sleep(random.random())
    print("Thread 【{}】 finished.".format(threading.currentThread().getName()))


if __name__ == "__main__":
    print("Thread 【{}】 is running...".format(threading.currentThread().getName()))

    # 创建2条子线程t1,t2,并分别命名为:"task_thread-1","task_thread-2"
    t1 = threading.Thread(target=task, name="task_thread-1")
    t2 = threading.Thread(target=task, name="task_thread-2")

    # 启动子线程
    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("Thread 【{}】 finished.".format(threading.currentThread().getName()))

运行结果:

Thread 【MainThread】 is running...
Thread 【task_thread-1】 is running...
Thread 【task_thread-1】 is performing task...0
Thread 【task_thread-2】 is running...
Thread 【task_thread-2】 is performing task...0
Thread 【task_thread-1】 is performing task...1
Thread 【task_thread-2】 is performing task...1
Thread 【task_thread-2】 is performing task...2
Thread 【task_thread-1】 is performing task...2
Thread 【task_thread-2】 is performing task...3
Thread 【task_thread-1】 is performing task...3
Thread 【task_thread-1】 is performing task...4
Thread 【task_thread-2】 is performing task...4
Thread 【task_thread-1】 finished.
Thread 【task_thread-2】 finished.
Thread 【MainThread】 finished.

说明:
1.由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程实例的名字叫MainThread。主线程MainThread又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。子线程的名字在创建时指定,本例中我们用了task_thread-1task_thread-2命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1,Thread-2...以此类推。

2.上面启动一个线程例子的核心:就是把一个函数传入并创建Thread实例,然后调用start()开始执行。

线程锁 Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

使用 Thread 对象的 LockRlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquirerelease方法之间。

假如,12306平台多线程卖票:假设12306开始是有10张票ticket_count_12306 = 10。创建5条线程去卖这10张票,其中某线程卖出后,过了sleep(0.0000000000000000000000000000001)后,该乘客又把票退了。理论上讲:此时12306平台还应该是10张票。但由于多条线程共享这10张票ticket_count_12306 = 10,结果就会有点意外。示例代码如下:

未使用线程锁 示例:
# 假设12306开始是有10张票
ticket_count_12306 = 10

# 卖票函数
def ticket_count_test():
    global ticket_count_12306
    # 卖出3张给乘客
    ticket_count_12306 = ticket_count_12306 - 3
    sleep(0.0000000000000000000000000000001)
    # 乘客退票3张
    ticket_count_12306 = ticket_count_12306 + 3
    print("{}剩余{}张".format(threading.currentThread().getName(), ticket_count_12306))
    if ticket_count_12306 != 10:
        print("【Error】:{}剩余{}张".format(threading.currentThread().getName(), ticket_count_12306))

# 未使用线程锁,已注释改行代码
# lock = threading.Lock()

# 多线程目标函数
def thread_test():
    for i in range(100):

        # lock.acquire()
        ticket_count_test()
        # lock.release()

if __name__ == "__main__":
    print("Thread 【{}】 is running...".format(threading.currentThread().getName()))

    # 创建5条线程,去卖票。
    t1 = threading.Thread(target=thread_test, name="线程1")
    t2 = threading.Thread(target=thread_test, name="线程2")
    t3 = threading.Thread(target=thread_test, name="线程3")
    t4 = threading.Thread(target=thread_test, name="线程4")
    t5 = threading.Thread(target=thread_test, name="线程5")

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()

    print("Thread 【{}】 finished.".format(threading.currentThread().getName()))

运行结果:

Thread 【MainThread】 is running...
线程1剩余-2张
【Error】:线程1剩余-2张
线程4剩余-2张
【Error】:线程4剩余-2张
线程2剩余-2张
【Error】:线程2剩余-2张

...(省略若干打印)

线程2剩余4张
【Error】:线程2剩余4张
【Error】:线程3剩余1张
线程4剩余1张
【Error】:线程4剩余1张
线程2剩余4张
线程3剩余7张
线程5剩余10张
Thread 【MainThread】 finished.

我们定义了一个共享变量ticket_count_12306,初始值为10,并且启动5个线程,理论上结果应该为10,但是,由于线程的调度是由操作系统决定的,当t1、t2、t3、t4、t5交替执行时,ticket_count_12306的结果就不一定是10了。

原因是因为高级语言的一条语句在CPU执行时是若干条语句

 # 卖出3张给乘客
    ticket_count_12306 = ticket_count_12306 - 3

    sleep(0.0000000000000000000000000000001)

    # 乘客退票3张
    ticket_count_12306 = ticket_count_12306 + 3

即使一个简单的计算:
ticket_count_12306 = ticket_count_12306 - 3
也分两步:
1.计算ticket_count_12306 - 3,存入临时变量中;
2.将临时变量的值赋给ticket_count_12306
也就是可以看成:

x = ticket_count_12306 - 3
ticket_count_12306 = x

为了放大这种耗时效果,我们采取了措施:sleep(0.0000000000000000000000000000001)。假如当线程1开始卖出3张票时,也被其他4个线程已各卖出3张且这4个线程都没来的及退票,此时线程1首先退票完成:10(即:12306最初的10张票) - 5 * 3(即:5个线程各卖出3张票) + 3(即:线程1首先退票完成) = -2(即:打印结果中的 线程1剩余-2张)

使用线程锁

如果我们要确保ticket_count_12306计算正确,就要给ticket_count_test()上一把锁,当某个线程开始执行ticket_count_test()时,我们说,该线程因为获得了锁,因此其他线程不能同时执行ticket_count_test(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。

创建一个锁就是通过threading.Lock()来实现:lock = threading.Lock()

使用 Thread 对象的 LockRlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquirerelease方法之间。

# 创建线程锁
lock = threading.Lock()

# 多线程目标函数
def thread_test():
    for i in range(100):

        # 获取线程锁
        lock.acquire()

        ticket_count_test()

        # 释放锁
        lock.release()

运行结果:

Thread 【MainThread】 is running...
线程1剩余10张
线程1剩余10张
线程1剩余10张
线程1剩余10张
线程1剩余10张
线程1剩余10张
线程3剩余10张

...(省略若干打印)

线程5剩余10张
线程5剩余10张
线程5剩余10张
线程5剩余10张
线程5剩余10张
线程5剩余10张
线程5剩余10张
线程5剩余10张
Thread 【MainThread】 finished.

当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。

获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们可以用try...finally来确保锁一定会被释放。

锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

GIL

提示 GIL:Global Interpreter Lock

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

补充:

线程优先级队列( Queue)

Python 的 Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列QueueLIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue

这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。

Queue 模块中的常用方法:

Queue.qsize() 返回队列的大小

Queue.empty() 如果队列为空,返回True,反之False

Queue.full() 如果队列满了,返回True,反之False

Queue.full 与 maxsize 大小对应

Queue.get([block[, timeout]])获取队列,timeout等待时间

Queue.get_nowait() 相当Queue.get(False)

Queue.put(item) 写入队列,timeout等待时间

Queue.put_nowait(item) 相当Queue.put(item, False)

Queue.task_done() 在完成一项工作之后,会向任务已经完成的队列发送一个信号

Queue.join() 实际上意味着等到队列为空,再执行别的操作

小结

多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时又要小心死锁的发生。

Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。


更多了解,可关注公众号:人人懂编程


微信公众号:人人懂编程

相关文章

网友评论

    本文标题:40.Python编程:多线程

    本文链接:https://www.haomeiwen.com/subject/qsbrbftx.html