美文网首页
线程-锁

线程-锁

作者: 小吉头 | 来源:发表于2020-08-17 09:34 被阅读0次

    场景

    在线程中,举例了多线程操作全局变量时是不安全的,因为底层的字节码并不能一次全部执行完成

    import threading
    import time
    
    num = 0
    
    def test():
        global num
        for i in range(100000):
            num += 1
        print('t1结束')
    
    def test1():
        global num
        for i in range(100000):
            num += 1
        print('t2结束')
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=test)
        t2 = threading.Thread(target=test1)
        t1.start()
        t2.start()
        time.sleep(1)
        print(num)
    >>>t1结束
    >>>t2结束
    >>>176111
    

    线程锁的使用

    import threading
    import time
    
    num = 0
    lock = threading.Lock()
    
    def test():
        global num
        for i in range(100000):
            lock.acquire()
            num += 1
            lock.release()
        print('t1结束')
    
    def test1():
        global num
        for i in range(100000):
            lock.acquire()
            num += 1
            lock.release()
        print('t2结束')
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=test)
        t2 = threading.Thread(target=test1)
        t1.start()
        t2.start()
        time.sleep(1)
        print(num)
    
    >>>t1结束
    >>>t2结束
    >>>200000
    

    假设t1先获取到锁,t2执行到acquire()时会处于阻塞状态
    通过lock可以确保num+1完整的执行,即底层的字节码会全部执行完成才切换到其他线程
    锁的使用虽然解决了多线程共享变量的问题,但也带来了其他问题:
    1、获取锁和释放锁需要开销,导致性能下降
    2、容易造成死锁

    死锁的情况

    一、单个线程内再次获取锁

    def test():
        global num
        for i in range(100000):
            lock.acquire()
            lock.acquire() #多获取了一次
            num += 1
            lock.release()
        print('t1结束')
    

    一个线程中重复获取锁,第二次获取时会等待锁的释放,但是又没有释放,会导致代码阻塞

    二、多个线程互相等待

    现在假设有两把锁,让线程t1先获取到lock_a然后等待t2获取到lock_b,再获取对方已获取的锁,导致了死锁

    import threading
    import time
    
    num = 0
    lock_a = threading.Lock()
    lock_b = threading.Lock()
    
    def test():
        global num
        for i in range(100000):
            lock_a.acquire()
            time.sleep(1)
            lock_b.acquire()
            num += 1
            lock_b.release()
            lock_a.release()
        print('t1结束')
    
    def test1():
        global num
        for i in range(100000):
            lock_b.acquire()
            lock_a.acquire()
            num += 1
            lock_a.release()
            lock_b.release()
        print('t2结束')
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=test)
        t2 = threading.Thread(target=test1)
        t1.start()
        t2.start()
        time.sleep(1)
        print(num)
    
    >>>0
    >>>一直等待...
    

    可重入锁-RLock

    同一个线程中,可以多次调用acquire(),但是acquire()release()的次数要一样

    import threading
    import time
    
    num = 0
    lock = threading.RLock() #使用可重入锁
    
    def test():
        global num
        for i in range(100000):
            lock.acquire()
            lock.acquire()#重复获取不会导致死锁
            num += 1
            lock.release()#释放
            lock.release()
        print('t1结束')
    ...
    

    条件锁condition的使用

    通过条件锁,可以实现多线程的协同工作,本例实现轮流执行的效果

    import threading
    
    
    class XiaoAi(threading.Thread):
        def __init__(self,cond):
            super().__init__()
            self.name = '小爱'
            self.cond = cond
    
        def run(self):
            with self.cond:#获取Condition初始化时的RLock锁,后面简称RLock锁
                print(1)
                self.cond.wait()#创建一个Lock锁放到双端队列中,此时代码阻塞并一直尝试获取该锁。然后释放RLock锁
                print('{} : 在'.format(self.name))
                self.cond.notify()
                self.cond.wait()
    
                print('{} : 好啊'.format(self.name))
    
    
    
    class TianMao(threading.Thread):
        def __init__(self,cond):
            super().__init__()
            self.name='天猫精灵'
            self.cond = cond
    
        def run(self):
            #with等价于self.cond.acquire()   self.cond.release()
            with self.cond:#tianmao线程等待获取RLock锁,等待xiaoai线程释放后获取到
                print('{} : 小爱同学'.format(self.name))
                self.cond.notify()#从双端队列取出Lock锁,并释放。由于xiaoai线程一直在wait(),这里一旦释放,xiaoai线程可以立刻获取到锁开始执行
                self.cond.wait()
    
                print('{} : 我们说说话吧'.format(self.name))
                self.cond.notify()
    
    if __name__ == "__main__":
        cond = threading.Condition()
        xiaoai = XiaoAi(cond)
        tianmao = TianMao(cond)
    
        xiaoai.start()#xiaoai线程先执行
        tianmao.start()
    
    >>>1
    >>>天猫精灵 : 小爱同学
    >>>小爱 : 在
    >>>天猫精灵 : 我们说说话吧
    >>>小爱 : 好啊
    

    1、xiaoai.start()要放在tianmao.start()前执行,保证xiaoai线程先进入等待状态,tianmao线程调用notify()方法xiaoai才能继续执行,如果tianmao线程先执行发送了notify()后,xiaoai线程还没有调用wait(),代码的结果就只打印了天猫精灵 : 小爱同学
    问题:经过多次测试,调换顺序后结果确实是只打印了天猫精灵 : 小爱同学。即使先让tianmao线程start,我觉得xiaoai线程也是有可能先执行到的,因为当tianmao线程启动后,恰好cpu分配给了主线程让xiaoai线程也启动了起来,这时有三个线程,然后恰好cpu又分配给了xiaoai线程先执行了一会,不就出现了正确的结果了吗?
    2、一开始是xiaoai先获取RLock,然后调用wait(),wait()方法里又创建了一把Lock,wait()源码如下,调用acquire()尝试获取Lock,然后释放了RLock,但是如果一开始就获取到了Lock xiaoai线程不就可以继续执行了吗,理论上应该是阻塞了,等待tianmao调用notify()去释放Lock,如果xiaoai线程阻塞的话,RLock又是怎么释放的?tianmao如何拿到RLock呢

            waiter = _allocate_lock()
            waiter.acquire()
            self._waiters.append(waiter)
            saved_state = self._release_save()
    

    控制线程并发执行数量

    threading.Semaphore类内部使用了threading.Condition,可以控制每次并发的线程数量
    不控制并发数量的情况,下面的代码产生20并发大概2秒左右全部完成

    import threading
    import time
    
    class DealData(threading.Thread):
        def run(self):
            time.sleep(2)
            print("get data ok")
    
    if __name__ == "__main__":
        for i in range(20):
            t = DealData()
            t.start()
    

    使用Semaphore控制并发数量,可以避免过多的并发数量,比如爬虫场景

    import threading
    import time
    
    class DealData(threading.Thread):
        def __init__(self,sem):
            super().__init__()
            self.sem = sem
    
        def run(self):
            time.sleep(2)
            print("get data ok")
            self.sem.release()
    
    
    if __name__ == "__main__":
        sem = threading.Semaphore(3)
        for i in range(20):
            sem.acquire()
            t = DealData(sem)
            t.start()
    

    线程池概念

    创建固定数量的线程(有任务才创建,会有个最大值),线程池循环利用里面的线程去执行多任务,避免频繁创建销毁的开销
    上面的threading.Semaphore有点线程池的意思,但只是做了并发数量的限制。
    通过from concurrent.futures import ThreadPoolExecutor除了可以设置并发数量,还能在主线程中获取子线程的状态和返回值。

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def get_html(sleep_time):
        time.sleep(sleep_time)
        print('get {}'.format(sleep_time))
        return sleep_time
    
    #创建线程池,设置最多2个并发
    executor = ThreadPoolExecutor(max_workers=2)
    #通过submit()函数提交任务到线程池,submit()是非阻塞,立刻返回。返回的是from concurrent.futures import Future,Future类对象
    task1 = executor.submit(get_html,3)
    task2 = executor.submit(get_html,2)
    task3 = executor.submit(get_html,4)
    
    print(task3.cancel())#True,最大并发是2,没有开始的任务可以取消
    print(task1.cancel())#False,执行中或执行完成的线程无法取消
    print(task1.done())#False
    print(task1.result())#result()使主线程阻塞,等待子线程执行完
    print(task1.done())#True
    

    获取已完成的任务

    from concurrent.futures import as_completedas_completed()是一个生成器,内部判断一旦有任务完成,yield返回该任务。

    from concurrent.futures import ThreadPoolExecutor,as_completed
    import time
    
    def get_html(sleep_time):
        time.sleep(sleep_time)
        print('get {}'.format(sleep_time))
        return sleep_time
    
    #创建线程池,设置最多2个并发
    executor = ThreadPoolExecutor(max_workers=2)
    #通过submit()函数提交任务到线程池,submit()是非阻塞,立刻返回
    task1 = executor.submit(get_html,3)
    task2 = executor.submit(get_html,2)
    task3 = executor.submit(get_html,4)
    
    
    all_task = [task1,task2,task3]
    for future in as_completed(all_task):
        data = future.result()
        print('get {} success'.format(data))
    
    print('done')
    
    >>>get 2
    >>>get 2 success
    >>>get 3
    >>>get 3 success
    >>>get 4
    >>>get 4 success
    >>>done
    

    as_completed()会阻塞,一直到获取到所有成功的任务,最后打印了done

    主线程等待某些线程完成

    使用wait()函数可以指定主线程等待哪些线程完成后可以继续

    from concurrent.futures import ThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETED
    import time
    
    def get_html(sleep_time):
        time.sleep(sleep_time)
        print('get {}'.format(sleep_time))
        return sleep_time
    
    #创建线程池,设置最多2个并发
    executor = ThreadPoolExecutor(max_workers=2)
    #通过submit()函数提交任务到线程池,submit()是非阻塞,立刻返回
    task1 = executor.submit(get_html,3)
    task2 = executor.submit(get_html,2)
    task3 = executor.submit(get_html,4)
    
    all_task = [task1,task2,task3]
    wait(all_task,return_when=ALL_COMPLETED) #如果换成FIRST_COMPLETED,输出:get  2、done 、get 3、get 4
    print('done')
    
    >>>get 2
    >>>get 3
    >>>get 4
    >>>done
    

    相关文章

      网友评论

          本文标题:线程-锁

          本文链接:https://www.haomeiwen.com/subject/lbkzdktx.html