美文网首页
理解Python线程Thread

理解Python线程Thread

作者: 蒋狗 | 来源:发表于2017-02-12 21:22 被阅读0次

Demo代码和引用知识点都参考自<a href="https://gold.xitu.io/post/5845134da22b9d006c2959c3">《理解Python并发编程一篇就够了 - 线程篇》--董伟明</a>或作者个人公众号Python之美, 《Python Cookbook》和<a href="http://www.liaoxuefeng.com/">廖雪峰Python3教程</a>。

GIL

由于CPython全局解释锁,Python利用多线程进行CPU密集的计算型任务时,可能性能会降低。

GIL是必须的,这是Python设计的问题:Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。 在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/O操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高(但是当它使用之后介绍的多进程库时,性能可以获得提高。

利用多线程计算斐波那契数。

# -*- coding: utf-8 -*-
# 导入相关依赖
from datetime import datetime
import threading
import time

# 记录时间装饰器
def log_time(func):
    def wrapper(*args, **kwargs):
        # start_time = datetime.now()
        start_time = time.time()
        res = func(*args, **kwargs)
        # end_time = datetime.now()
        end_time = time.time()
        # print('cost %ss' % (end_time - start_time).seconds)
        print('cost %ss' % (end_time - start_time))
        return res
    return wrapper

# 计算斐波那契数方法
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

# 单线程计算两次
@log_time
def single_thread():
    fib(33)
    fib(33)

# 多线程执行
@log_time
def multi_thread():
    for _ in range(2):
        t = threading.Thread(target=fib, args=(33, ))
        t.start()
    main_thread = threading.currentThread()
    for t in threading.enumerate():
        if t is main_thread:
            continue
        t.join()

single_thread()
multi_thread()
cost 3.089695453643799s
cost 3.232300281524658s

不幸的是运行了几次,始终得到结果multi_thread()耗时没有远大于single_thread(),但也没有达到提高性能的目的。
虽然有GIL,Python多线程仍可用于I/O密集型的场景。

多线程的同步

1.信号量Semaphore

在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,需要进行同时访问的数量(通常是1)。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。

不同于线程池,之前的子线程释放后会创建新的线程,而不是重用线程池里的线程。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import random
import time

sema = threading.Semaphore(3)

def foo(tid):
    # 利用with,代替acquire()和release()
    with sema:
        current_thread_name = threading.currentThread().name
        print('%s: %s acquire sema' % (current_thread_name, tid))
        wt = random.random()
        time.sleep(wt)
    print('%s: %s release sema.' % (current_thread_name, tid))

for i in range(5):
    thread_name = 'thread_' + str(i)
    t = threading.Thread(name=thread_name, target=foo, args=(i, ))
    t.start()

创建信号量时限制了访问资源的线程数量为3,同时最多只有3个线程执行,3次acquire()后,计数器成0,线程调用acquire()被阻塞,等到某个线程release()后,之后的线程才能acqure(),实际还是创建了5个线程。

2.同步锁Lock

互斥锁,相当于信号量为1。在<a href='http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/00143192823818768cd506abbc94eb5916192364506fa5d000'>廖雪峰Python教程 -- 多线程</a>中也解释了Lock()。
若没有加上互斥锁。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time

balance = 0  # 余额

def deposit(money):
    global balance
    balance += money

def excute_method(money):
    for i in range(100000):
        deposit(money)

t_1 = threading.Thread(name='t_1', target=excute_method, args=(1, ))
t_2 = threading.Thread(name='t_2', target=excute_method, args=(1, ))

t_1.start()
t_2.start()
t_1.join()
t_2.join()

print('balance is %s' % balance)

最终的结果远小于100000*2。

balance is 37564

因为在deposit()balance += money,CPU执行时实际会被拆分成

temp = balance + money
balance = temp

对于每个线程,temp是成员变量,而balance是公有的,在多线程执行时,t_1t_2交替运行,会导致balance最终的值异常,所以需要对balance加锁。
之前的Demo是两个线程同时循环累加100000次,下面是500个线程同时加1。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time

lock = threading.Lock()
balance = 0

def deposit(money):
    global balance
    with lock:
        # balance += money
        temp = balance + money
        time.sleep(0.01)  # 使线程有时间切换
        balance = temp

threads = []

for i in range(500):
    t = threading.Thread(target=deposit, args=(1, ))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print('balance is %s' % balance)

最终的到的结果始终是500。

balance is 500

3.可重入锁RLock

...

4.条件变量Condition

一个线程等待特定条件,而另一个线程发出特定条件满足的信号。最好说明的例子就是「生产者/消费者」模型:

以及下面的事件Event,队列Queue都将用生产者/消费者模型来举例。
Condition: 主要的方法是wait()notifyAll()notify(),同时还提供了类似Lock()acquire()release()
wait(): 创建了一个名为waiter的锁,并且设置锁的状态为locked,进入阻塞状态,直至收到一个notify()通知。这个waiter锁用于线程间的通讯。
notify()notifyAll(): 释放waiter锁,唤醒线程。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time

# 生产者
def producer(cond):
    current_thread_name = threading.currentThread().name
    with cond:
        time.sleep(0.1)
        print('%s: make resource available.' % current_thread_name)
        cond.notifyAll()  # 唤醒消费者线程

# 消费者
def consumer(cond):
    current_thread_name = threading.currentThread().name
    with cond:
        cond.wait()  # 创建了一个名为waiter的锁,并且设置锁的状态为locked。这个waiter锁用于线程间的通讯。
        print('%s: Resource is available to consumer' % current_thread_name)

cond = threading.Condition()

p1 = threading.Thread(name='p1', target=producer, args=(cond, ))
c1 = threading.Thread(name='c1', target=consumer, args=(cond, ))
c2 = threading.Thread(name='c2', target=consumer, args=(cond, ))

c1.start()
c2.start()
time.sleep(0.1)
p1.start()

生产者发出的消息被消费者接收到了,但需要注意的是,在这个Demo中消费者线程需要比生产者线程先执行获取waiter锁,否则会出现问题。(可能是因为consumer还没获取到锁,而producer已经执行了notifyAll()唤醒操作?)
问题1: 感觉上述例子太过于简单,没有很好的说明Condition的用法,且notifyAll()不是很常用?
参考网上的示例写了另一个Demo,如下:

import threading  
import time  
import queue
   
condition = threading.Condition()  
# products = 0  # 改为产品队列
products = queue.Queue(10)
count = 20  # 最多生产20个
# done = False  # 结束标志

class Producer(threading.Thread):  
    def __init__(self):  
        threading.Thread.__init__(self)  
          
    def run(self):  
        global condition, products, count, done
        while count > 0:
            if condition.acquire():  
                if not products.full():  
                    products.put(1)
                    print("Producer(%s):deliver one, now products:%s" %(self.name, products.qsize()))
                    condition.notify()  
                    count -= 1
                else:  
                    print("Producer(%s):already 10, stop deliver, now products:%s" %(self.name, 0))
                    condition.wait()  
                condition.release()
                time.sleep(0.5)
        # done = True
        print('break producer')
          

class Consumer(threading.Thread):  
    def __init__(self):  
        threading.Thread.__init__(self)  
          
    def run(self):  
        global condition, products, done
        # while not done:
        while True:
            if condition.acquire():  
                if not products.empty():  
                    n = products.get()
                    time.sleep(0.5)
                    print("Consumer(%s):consume one, now products:%s" % (self.name, n)) 
                    condition.notify()  
                else:  
                    print("Consumer(%s):only 0, stop consume, products:%s" % (self.name, 0))
                    condition.wait()
                condition.release()  
                time.sleep(0.5)
        print('break consumer')
                  
                  
threads = []

for p in range(0, 5):  
    p = Producer()  
    p.start()
    threads.append(p)
      
for c in range(0, 10):  
    c = Consumer()
    c.start()
    threads.append(c)

for t in threads:
    t.join()

print('end main')

5.事件Event

一个线程发送/传递事件,另外的线程等待事件的触发。

可用于线程间的通信,主线程对子线程的控制。<a href="http://blog.csdn.net/cnweike/article/details/40821283">Python中使用threading.Event协调线程的运行</a>,该博客举了一个利用Event来协调线程运行的场景,感觉比下面的Demo举例好。
Event 主要有set()clear()wait()方法。
原文中Event的例子是无限循环且不会退出的,稍作修改。

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import time
import random

def producer(event, l):
    current_thread_name = threading.currentThread().name
    count = 10
    while count > 0:
        n = random.randint(10, 100)
        l.append(n)
        print('%s: %s appended to list' % (current_thread_name, n))
        count -= 1
        event.set()
        # 若该处不设置time.sleep()则可能生产者执行后,消费者执行时只会pop最新的
        # 可能有之前的没有pop出来,但又append了新的元素。
        time.sleep(0.1) 

def consumer(event, l):
    current_thread_name = threading.currentThread().name
    while 1:
        event_is_set = event.wait(2)  # 设置超时时间,超时后break
        if event_is_set:
            try:
                n = l.pop()
                print('%s: %s poped from list' % (current_thread_name , n))
                event.clear()  # 清空事件状态
            except IndexError:  # 为了让刚启动时容错
                pass    
        else:
            break

event = threading.Event()
l = []

p1 = threading.Thread(name='p1', target=producer, args=(event, l))
c1 = threading.Thread(name='c1', target=consumer, args=(event, l))
c2 = threading.Thread(name='c2', target=consumer, args=(event, l))

p1.start()
c1.start()
c2.start()

问题2: Event和Condition的异同?各自用于什么场景。《Python CookBook》 12.2 提及了Event和Condition。

event 对象最好单次使用,就是说,你创建一个event 对象,让某个线程等待这个
对象,一旦这个对象被设置为真,你就应该丢弃它。尽管可以通过clear() 方法来重
置event 对象,但是很难确保安全地清理event 对象并对它重新赋值。很可能会发生错
过事件、死锁或者其他问题(特别是,你无法保证重置event 对象的代码会在线程再
次等待这个event 对象之前执行)。如果一个线程需要不停地重复使用event 对象,你
最好使用Condition 对象来代替。

6.队列Queue

队列是线程,进程安全的,是很常见的并发编程时用到的数据结构。
Queue 主要有put()get()join()empty()等方法。
put() : 往队列里添加一项。
get() : 从队列中取出一项。
empty() : 判断队列是否为空。
join(): 阻塞直至队列中项目执行完毕。
task_done() : 在某一项任务完成时调用。
ps:multiprocessing模块也有Queue,但他不支持join()task_done(),可以使用模块下的JoinableQueue
使用队列模拟生产者/消费者模型:

# -*- coding: utf-8 -*-
# 导入相关依赖
import threading
import queue
import time
import random

def double(n):
    return n * 2

q = queue.Queue()

def producer():
    count = 15
    current_thread_name = threading.currentThread().name
    while count > 0:
        n = random.randint(10, 100)
        q.put((double, n))
        # time.sleep(0.5)  # 若在这里有个耗时操作则该Demo的消费者会直接break
        print('%s: put %s in to queue.' % (current_thread_name, n))
        count -= 1


def consumer():
    current_thread_name = threading.currentThread().name
    while 1:
        if q.empty():
            break
        task, arg = q.get()
        res = task(arg)
        time.sleep(0.1)  # 耗时操作让线程可以切换
        print('%s: result is %s.' % (current_thread_name, res))
        q.task_done()

p1 = threading.Thread(name='p1', target=producer)
c1 = threading.Thread(name='c1', target=consumer)
c2 = threading.Thread(name='c2', target=consumer)

# c1.setDaemon(True)
# c2.setDaemon(True)

p1.start()
c1.start()
c2.start()

问题3: 在上述Demo中,若生产者线程阻塞了,那消费者线程不就先启动然后直接break了?(1.尝试把break去掉,让消费者线程一直执行,为让其正常结束,将消费者线程设置为守护线程,并在最后对队列进行join()阻塞保证正常执行。2.利用Event和Queue, Condition和Queue。)

除了普通的队列外还有优先级队列PriorityQueue(),会按传入的优先级来get()并返回。

7.线程池

通过线程池来创建并重复利用和销毁线程,避免过多的创建销毁线程所产生的花销。
内置线程池的map()

from multiprocessing.pool import ThreadPool
pool = ThreadPool(3)
pool.map(lambda x : x * 2, [1, 2, 3])

利用队列简单实现线程池:

# 导入相关依赖
import threading
import time
import queue

class Worker(threading.Thread):
    def __init__(self, q):
        super().__init__()
        self._q = q
        self.daemon = True  # 守护线程
        self.start()
    
    def run(self):
        while 1:
            f, args, kwargs = self._q.get()
            res = f(*args, **kwargs)
            print('%s: result is %s.' % (self.name, res))
            self._q.task_done()

class CostumePool():
    def __init__(self, pool_size):
        self._q = queue.Queue(poo_size)
        for _ in range(pool_size):
            Worker(self._q)

    def add_task(self, f, *args, **kwargs):
        self._q.put((f, args, kwargs))

    def wait_complete(self):
        self._q.join()

def double(n):
    return n * 2

pool = CostumePool(3)
for i in range(10):
    pool.add_task(double, i)
    time.sleep(0.1)

pool.wait_complete()

最多只会创建3个子线程,当前任务执行完后复用,执行新的任务。

相关文章

网友评论

      本文标题:理解Python线程Thread

      本文链接:https://www.haomeiwen.com/subject/qcvwittx.html