- 多线程给我们带来的好处是可以并发的执行多个任务,特别是对于I/O密集型的业务,使用多线程,可以带来成倍的性能增长。
- 可是当我们多个线程需要修改同一个数据,在不做任何同步控制的情况下,产生的结果往往是不可预料的,比如两个线程,一个输出hello,一个输出world,实际运行的结果,往往可能是一个是hello world,一个是world hello。
- python里提供了多个用于控制多线程同步的同步原语,这些原语,包含在python的标准库threading.py当中。我今天简单的介绍一下python里的这些控制多线程同步的原语,包括:Locks、RLocks、Semaphores、Events、Conditions和Barriers,你也可以继承这些类,实现自己的同步控制原语。
from threading import Lock, Thread lock = Lock() g = 0 def add_one(): """ Just used for demonstration. It’s bad to use the ‘global’ statement in general. """ global g lock.acquire() g += 1 lock.release() def add_two(): global g lock.acquire() g += 2 lock.release() threads = [] for func in [add_one, add_two]: threads.append(Thread(target=func)) threads[-1].start() for thread in threads: """ Waits for threads to complete before moving on with the main script. """ thread.join() print(g)
# 使用Lock import threading num = 0 lock = Threading.Lock() lock.acquire() num += 1 lock.acquire() # 这个地方阻塞 num += 2 lock.release() # 使用RLock lock = Threading.RLock() lock.acquire() num += 3 lock.acquire() # 这不会阻塞 num += 4 lock.release() lock.release() # 这个地方注意是释放两次锁
import random, time from threading import BoundedSemaphore, Thread max_items = 5 container = BoundedSemaphore(max_items) def producer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") try: container.release() print("Produced an item.") except ValueError: print("Full, skipping.") def consumer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") if container.acquire(False): print("Consumed an item.") else: print("Empty, skipping.") threads = [] nloops = random.randrange(3, 6) print("Starting with %s items." % max_items) threads.append(Thread(target=producer, args=(nloops,))) threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),))) for thread in threads: # Starts all the threads. thread.start() for thread in threads: # Waits for threads to complete before moving on with the main script. thread.join() print("All done.")
- threading模块还提供了一个Semaphore对象,它允许你可以任意次的调用release函数,但是最好还是使用BoundedSemaphore对象,这样在release调用次数过多时会报错,有益于查找错误。Semaphores最长用来限制资源的使用,比如最多十个进程。
import random, time from threading import Event, Thread event = Event() def waiter(event, nloops): for i in range(nloops): print(“%s. Waiting for the flag to be set.” % (i+1)) event.wait() # Blocks until the flag becomes true. print(“Wait complete at:”, time.ctime()) event.clear() # Resets the flag. print() def setter(event, nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) # Sleeps for some time. event.set() threads = [] nloops = random.randrange(3, 6) threads.append(Thread(target=waiter, args=(event, nloops))) threads[-1].start() threads.append(Thread(target=setter, args=(event, nloops))) threads[-1].start() for thread in threads: thread.join() print(“All done.”)
import random, time from threading import Condition, Thread condition = Condition() box = [] def producer(box, nitems): for i in range(nitems): time.sleep(random.randrange(2, 5)) # Sleeps for some time. condition.acquire() num = random.randint(1, 10) box.append(num) # Puts an item into box for consumption. condition.notify() # Notifies the consumer about the availability. print("Produced:", num) condition.release() def consumer(box, nitems): for i in range(nitems): condition.acquire() condition.wait() # Blocks until an item is available for consumption. print("%s: Acquired: %s" % (time.ctime(), box.pop())) condition.release() threads = [] nloops = random.randrange(3, 6) for func in [producer, consumer]: threads.append(Thread(target=func, args=(box, nloops))) threads[-1].start() # Starts the thread. for thread in threads: thread.join() print("All done.")
- conditions还有其他很多用户,比如实现一个数据流API,当数据准备好了可以通知其他线程去处理数据。
from random import randrange from threading import Barrier, Thread from time import ctime, sleep num = 4 b = Barrier(num) names = [“Harsh”, “Lokesh”, “George”, “Iqbal”] def player(): name = names.pop() sleep(randrange(2, 5)) print(“%s reached the barrier at: %s” % (name, ctime())) b.wait() threads = [] print(“Race starts now…”) for i in range(num): threads.append(Thread(target=player)) threads[-1].start() for thread in threads: thread.join() print() print(“Race over!”)