美文网首页
python线程的一些知识

python线程的一些知识

作者: DayBreakL | 来源:发表于2020-05-16 11:31 被阅读0次

    python threading模块

    import threading
    import time
    
    def thread_demo():
        print("===线程,来自方法==={}".format(threading.current_thread()))
        sum = 0
        for i in range(1000):
            time.sleep(0.01) # 测试子线程
            sum += i
    
        print(sum)
    
    print("===线程,创建子线程前==={}".format(threading.current_thread()))
    # 创建线程
    t1 = threading.Thread(target=thread_demo)  # 参数target 指定要执行的任务;参数args 指定方法传递的参数
    t2 = threading.Thread(target=thread_demo)
    # 启动创建的线程,让线程去执行任务
    t1.start()
    t2.start()
    print("====线程数===={}".format(threading.enumerate()))
    print("====线程数===={}".format(threading.active_count()))
    # 阻塞当前线程,默认等待指定的线程执行结束
    t1.join() # 等t1回来再继续往下执行
    t2.join() # 等t2回来再继续往下执行
    print("===线程==={}".format(threading.current_thread()))
    # 主线程就是代码从上往下运行
    print("主线程结束")
    
    

    重写threading.Thread来实现:

    import threading
    import time
    
    def thread_demo():
        print("===线程,来自方法==={}".format(threading.current_thread()))
        sum = 0
        for i in range(1000):
            time.sleep(0.01) # 测试子线程
            sum += i
    
        print(sum)
    # 重写threading,继承threading.Thread
    class MyThread(threading.Thread):
    
        # 重写run方法,待执行的任务
        def run(self):
            thread_demo() #
    
    print("===线程,创建子线程前==={}".format(threading.current_thread()))
    # 创建线程
    m1=MyThread()
    m2=MyThread()
    # 运行线程
    m1.start()
    m2.start()
    print("====线程数===={}".format(threading.enumerate()))
    print("====线程数===={}".format(threading.active_count()))
    # 阻塞当前线程,默认等待指定的线程执行结束
    m1.join()
    m2.join()
    print("主线程结束")
    

    守护线程:

    # 守护线程
    def thread_demo():
        print("===线程,来自方法==={}".format(threading.current_thread()))
        sum = 0
        for i in range(1000):
            time.sleep(0.01) # 测试子线程
            sum += i
    
        print(sum)
    
    
    t1=threading.Thread(target=thread_demo)
    t1.daemon=True # 表示当前线程就是t1的守护线程 ,通过注释更好理解,主线程结束,守护线程也会结束
    t1.start()
    
    time.sleep(2)
    print("主线程结束")
    

    线程的生命周期:


    • 实例化threading.Thread时,创建了线程 --->新建
    • start() 准备干活-->就绪 ,等待CPU的调度
    • cpu的切换分为2种:
      1. 就绪 <---> 运行 反复切换 (高频率计算的)
      2. 就绪<--->运行--->阻塞--->就绪--->运行。对于一些需要等待结果的,比如等待数据库返回、比如time.sleep(),cpu就会把状态由运行改为阻塞,不会再去调度你。
    • 直到你等待的结果回来,比如数据库返回了、sleep()结束。你的状态由阻塞变为就绪,等待cpu重新调度。
    • 运行-->死亡 计算结束、报异常等,线程死亡

    同步锁和互斥锁

    a = 0
    def add_one():
        global a
        for i in range(10000000):
            b = a + 1
            a = b
    
    
    # 创建线程
    thread_list = [threading.Thread(target=add_one) for i in range(5)]
    
    for i in thread_list:
        i.start()
    
    for i in thread_list:
        i.join()
    
    print(a)  # 远小于500000000,
    
    为什么a远小于500000000
    • e.g. 线程1 b=1000 线程2 b=2000 线程3 b=3000 线程4 b=4000 线程5 b=5000 ,此时cpu再开始到线程1 a=b=1000,那么相当于后面线程的都是无用功
    • 怎么办呢?加锁
    互斥锁
    a = 0
    
    lock=threading.Lock() # 创建一把锁 互斥锁
    def add_one():
        # 加锁
        lock.acquire() # 已锁定,其他线程就无法加锁成功
        global a
        for i in range(10000000):
            b = a + 1
            a = b
        lock.release() # 结束锁定
    
    #  创建线程
    thread_list = [threading.Thread(target=add_one) for i in range(5)]
    
    for i in thread_list:
        i.start()
    
    for i in thread_list:
        i.join()
    
    print(a)  # 500000000,
    
    死锁
    a = 0
    
    lock=threading.Lock() # 创建一把锁 互斥锁
    def add_one():
        # 加锁
        lock.acquire() # 已锁定,其他线程就无法加锁成功
        global a
        for i in range(10000000):
            b = a + 1
            raise NameError("线程挂掉了") # 假设在加锁后、未释放锁之前挂掉了
            a = b
        lock.release() # 结束锁定
    
    #  创建线程
    thread_list = [threading.Thread(target=add_one) for i in range(5)]
    
    for i in thread_list:
        i.start()
    
    for i in thread_list:
        i.join()
    
    print(a)  # 500000000,
    

    加了代码raise NameError("线程挂掉了") 模拟线程在加锁后、未释放锁之前挂掉了,那么程序会一直卡在那里,该线程无法继续,其他线程要等待加锁的线程释放,导致也无法运行。这种情况就是死锁。

    通过try... finally... ,无论结果如何,都要释放锁,这样程序不会卡在那里

    lock=threading.Lock() # 创建一把锁 互斥锁
    def add_one():
        # 加锁
        lock.acquire() # 已锁定,其他线程就无法加锁成功
        try:
            global a
            for i in range(10000000):
                b = a + 1
                raise NameError("线程挂掉了")
                a = b
        finally:
            lock.release() # 结束锁定
    
    逻辑死锁
    a = 0
    
    lock_1=threading.Lock() # 互斥锁
    lock_2=threading.Lock() # 互斥锁
    
    def add_one():
    
        print("线程{}进入方法add_one".format(threading.current_thread()))
        lock_1.acquire() # 线程2 lock_1 加锁
        lock_2.acquire() # 
        print("线程{}开始计算".format(threading.current_thread()))
        lock_1.release()  # 
        time.sleep(0.1) # 线程1 阻塞  lock_2锁定,休眠时间过了之后,lock_1加锁,但lock_1已被占用,
        lock_1.acquire()
        global a
        for i in range(10000000):
            b = a + 1
            raise NameError("线程挂掉了")
            a = b
    
        lock_2.acquire()
        lock_1.release()
        print("线程{}退出方法add_one".format(threading.current_thread()))
    
    
    # 创建线程
    thread_list = [threading.Thread(target=add_one) for i in range(5)]
    
    for i in thread_list:
        i.start()
    
    for i in thread_list:
        i.join()
    
    print(a)  
    

    锁的滥用容易出现逻辑死锁,故应避免使用多把锁。

    线程间的通信

    线程间的通信:

    Condition线程通信

    消费者-生产者设计模式
    con=threading.Condition() # Condition默认自带互斥锁
    
    ball=[]
    def set_data():
    
        for i in range(10):
            con.acquire()
            for j in range(20):
                ball.append("丸子")
            print("第{}个丸子放进去了,吃货们快来吃".format(len(ball)))
            con.notify_all() # 激活其他所有线程进入就绪状态
            con.wait() # wait()释放当前锁,并进入阻塞状态
    
    def get_data():
        while True:
            con.acquire() #加锁
            time.sleep(0.5)
            if(len(ball)==0):
                print("锅里没丸子了")
                con.notify_all()
                con.wait()
            else:
                ball.pop()
                print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),len(ball)))
            con.release()
    
    product=threading.Thread(target=set_data)
    customers=[threading.Thread(target=get_data) for i in range(3)]
    
    product.start()
    for i in customers:
        i.start()
    
    product.join()
    

    Queue队列实现线程通信

    
    con=threading.Condition() # Condition默认自带互斥锁
    
    # 用队列储存丸子
    q=queue.Queue(maxsize=10) # 创建队列
    def set_data():
    
        for i in range(10):
            con.acquire()
            for j in range(20):
                if q.full(): # 判断队列是否已满
    
                    print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                    con.notify_all()  # 激活其他所有线程进入就绪状态
                    con.wait()  # wait()释放当前锁,并进入阻塞状态
                else:
                    q.put("丸子") # 往队列里放东西
    
    
    
    def get_data():
    
        while True:
            con.acquire() #加锁
            time.sleep(0.5)
            if q.empty(): #判断队列是否为空
                print("锅里没丸子了")
                con.notify_all()
                con.wait()
            else:
                q.get()  # 获取队列中的数据
                print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
            con.release()
    
    product=threading.Thread(target=set_data)
    customers=[threading.Thread(target=get_data) for i in range(3)]
    
    product.start()
    for i in customers:
        i.start()
    
    product.join()
    

    通过Event实现通信 Event可以代替Condition

    product_env=threading.Event() # 生产者和消费者不能共用一个evn
    customer_env=threading.Event()
    
    # 用队列储存丸子
    q=queue.Queue(maxsize=10) # 创建队列
    def set_data():
    
        for i in range(10):
            for j in range(20):
                if q.full(): # 判断队列是否已满
    
                    print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                    customer_env.set() # 通知消费者
                    product_env.clear() # 把生产者线程里的flag设置为 False
                    product_env.wait() # 阻塞当前线程
                else:
                    q.put("丸子") # 往队列里放东西
    
    
    
    def get_data():
    
        while True:
            time.sleep(0.5)
            if q.empty(): #判断队列是否为空
                print("锅里没丸子了")
                product_env.set()
                customer_env.clear()
                customer_env.wait()
            else:
                q.get()  # 获取队列中的数据
                print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
    
    
    product=threading.Thread(target=set_data)
    customers=[threading.Thread(target=get_data) for i in range(3)]
    
    product.start()
    for i in customers:
        i.start()
    
    product.join()
    

    线程池

    线程池
    product_env=threading.Event() # 生产者和消费者不能共用一个evn
    customer_env=threading.Event()
    
    q=queue.Queue(maxsize=10) # 创建队列
    def set_data():
    
        for i in range(10):
            for j in range(20):
                if q.full(): # 判断队列是否已满
    
                    print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                    customer_env.set() # 通知消费者
                    product_env.clear() # 把生产者线程里的flag设置为 False
                    product_env.wait() # 阻塞当前线程
                else:
                    q.put("丸子") # 往队列里放东西
        return "没丸子,别吃了"
    
    def get_data():
    
        while True:
            time.sleep(0.5)
            if q.empty(): #判断队列是否为空
                print("锅里没丸子了")
                product_env.set()
                customer_env.clear()
                customer_env.wait()
            else:
                q.get()  # 获取队列中的数据
                print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
    
        def get_result(feature):
            print(feature.result())
    
    pool=ThreadPoolExecutor(max_workers=4) # 创建线程池,max_workers 指定最大线程数
    product_future=pool.submit(set_data)# 往线程池中添加任务,并激活线程中的线程,即改为就绪状态
    product_future.add_done_callback() # 等同于result()
    customer_future1=pool.submit(get_data) #
    customer_future2=pool.submit(get_data)
    customer_future3=pool.submit(get_data)
    
    # res=product_future.result()  #result方法,获取程序的返回值
    # print(res)
    
    pool.shutdown() # 关闭线程池
    

    map方法:

    # 使用map方法,map使用于有参的方法
    product_env=threading.Event() # 生产者和消费者不能共用一个evn
    customer_env=threading.Event()
    
    q=queue.Queue(maxsize=10) # 创建队列
    def set_data(d=None):
    
        for i in range(10):
            for j in range(20):
                if q.full(): # 判断队列是否已满
    
                    print("当前有{}个丸子,吃货们来吃吧".format(q.qsize())) #q.qsize()返回队列中数据的个数
                    customer_env.set() # 通知消费者
                    product_env.clear() # 把生产者线程里的flag设置为 False
                    product_env.wait() # 阻塞当前线程
                else:
                    q.put("丸子") # 往队列里放东西
        return "没丸子,别吃了"
    
    def get_data(d=None):
    
        while True:
            time.sleep(0.5)
            if q.empty(): #判断队列是否为空
                print("锅里没丸子了")
                product_env.set()
                customer_env.clear()
                customer_env.wait()
            else:
                q.get()  # 获取队列中的数据
                print("吃货{}夹走了一个丸子,锅里还剩下{}个丸子".format(threading.current_thread(),q.qsize()))
    
        def get_result(feature):
            print(feature.result())
    
    pool=ThreadPoolExecutor(max_workers=4) # 创建线程池,max_workers 指定最大线程数
    
    product=pool.map(set_data,(None,))
    customer=pool.map(get_data,(None,None,None)) #启动三次
    
    # 获取返回值
    print(next(product))
    
    pool.shutdown() # 关闭线程池
    

    相关文章

      网友评论

          本文标题:python线程的一些知识

          本文链接:https://www.haomeiwen.com/subject/ljulohtx.html