美文网首页Python精选
Python3多线程threading介绍(转载)

Python3多线程threading介绍(转载)

作者: CurryCoder | 来源:发表于2019-08-18 11:53 被阅读0次

    多线程介绍

    • 在python3中,通过该threading模块提供线程的功能。原来的thread模块已经废弃。但是,threading模块中有个Thread类是模块中最主要的线程类,一定要记住!!!
    • threading模块提供了一些实用的方法或属性,例如:


      threading模块的方法和属性.png
    • theading模块包含以下的类:
      • Thread: 基本线程类
      • Lock:互斥锁
      • RLock:可重入锁,使单一进程再次获得已持有的锁(递归锁)
      • Condition:条件锁,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。
      • Semaphore:信号锁,为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞。
      • Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活。
      • Timer:一种计时器
      • Barrier:Python3.2新增的“阻碍”类,必须达到指定数量的线程后才可以继续执行。

    1.多线程

    • 有两种方法来创建多线程:一种是继承Thread类,并重写它的run()方法;另一种是实例化threading.Thread对象时,将线程要执行的任务函数作为参数传入线程。
    • 第一种方法:
      import threading
      
      class MyThread(threading.Thread):
          def __init__(self, thread_name):
              super(MyThread, self).__init__(name = thread_name)
      
          # 重写run()方法
          def run(self):
              print("%s正在运行中......" % self.name)
      
      for i in range(10):
          MyThread("thread-" + str(i)).start()   # 启动线程
      
    • 第二种方法:
      import threading
      import time
      
      def show(arg):
          time.sleep(1)
          print("thread " + str(arg) + " running......")
      
      for i in range(10):
          t = threading.Thread(target=show, args=(i,))  # 注意传入的参数一定是一个元组!
          t.start()
      
      
    • 对于Thread类,它的定义如下:
      threading.Thread(self, group=None, target=None, name=None,agrs=(),kwargs=None, *, daemon=None)
      
      • 参数group是预留的,用于将来扩展
      • 参数target是一个可调用对象,在线程启动后执行
      • 参数name是线程的名字。默认值为“Thread-N“,N是一个数字
      • 参数args和kwargs分别表示调用target时的参数列表和关键字参数
    • Thread类定义的常用方法和属性


      Thread类的方法和属性.png
    • 在多线程执行过程中,有一个特点要注意,每个线程各自执行自己的任务,不等待其他的线程,自顾自的完成自己的任务,例如下面的例子:
      import time
      import threading
      
      def doWaiting():
          print("开始等待:", time.strftime('%H:%M:%S'))
          time.sleep(3)
          print("结束等待:", time.strftime("%H:%M:%S"))
      
      t = threading.Thread(target=doWaiting)
      t.start()
      
      time.sleep(1)  # 确保线程已经启动
      print("开始工作")
      print("结束工作")
      
    • 分析上述过程:Python默认会等待最后一个线程执行完毕后才退出。上面例子中,主线程没有等待子线程t执行完毕,而是啥都不管,继续往下执行它自己的代码,执行完毕后也没有结束整个程序,而是等待子线程t执行完毕,整个程序才结束
    • 有时候我们希望主线程等等子线程,不要“埋头往前跑”。那要怎么办?使用join()方法!,如下所示:
      import threading
      import time
      
      def doWaiting():
          print("开始等待: ", time.strftime("%H:%M:%S"))
          time.sleep(3)
          print("结束等待:", time.strftime("%H:%M:%S"))
      
      t = threading.Thread(target=doWaiting)
      
      t.start()
      # 确保线程t已经启动
      time.sleep(1)
      
      print("开始阻塞主线程,等待子线程执行")
      t.join()   # 主线程不要着急走,等等子线程吧!!! 将一直堵塞,直到t运行结束
      print("子线程执行完,结束阻塞,主线程继续执行!")
      
    • 还可以使用setDaemon(True)吧所有的子线程都变成主线程的守护进程。当主线程结束后,守护子进程也会随之结束,整个程序也跟着退出。
      import threading
      import time
      
      def run():
          print(threading.current_thread().getName(), "开始工作")
          time.sleep(2)  # 子线程停两秒
          print("子线程工作执行完成!")
      
      for i in range(3):
          t = threading.Thread(target=run)
          t.setDaemon(True)   # 把子线程设置为守护进程,必须在start()之前设置!!!
          
          t.start()
          
      time.sleep(1)  # 主线程停1s
      print("主线程结束运行...")
      print(threading.active_count())  # 输出活跃的线程数量
      

    2.自定义线程类

    • 对于threading模块的Thread类,本质上是执行了它的run()方法。因此可以字定义线程类,让它继承Thread类,然后重新run()方法即可。
      import threading
      
      class MyThreading(threading.Thread):
          def __init__(self, func, arg):
              super(MyThreading, self).__init__()
              self.func = func
              self.arg = arg
          # 重写run()方法
          def run(self):
              self.func(self.arg)
      
      
      def my_func(args):
          '''
          此处可以把你想让线程做的事定义在这里
          '''
          print("我是业务函数...")
          pass
      obj = MyThreading(my_func, 123)
      obj.start()
      

    3.线程锁

    • 由于线程之间的任务执行是CPU进行随机调度的,并且每个线程可能只执行了n条指令之后就被切换到别的线程了。当多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,这被称为“线程不安全”。为了保证数据安全,我们设计了线程锁,即同一时刻只允许一个线程操作该数据。线程锁用于锁定资源,可以同时使用多个锁,当你需要独占某一资源时,任何一个锁都可以锁这个资源,就好比你用不同的锁都可以把相同的一个箱子锁住是一个道理。
    • 没有锁的情况下,脏数据是如何产生的
      import threading
      import time
      
      number = 0
      
      def plus():
          global number        # global声明此处的number是外面的全局变量number
          for _ in range(1000000):   # 进行一个大数级别的循环加一运算
              number += 1
          print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
      for i in range(2):   # 用2个子线程,就可以观察到脏数据
          t = threading.Thread(target=plus)
          t.start()
      
      time.sleep(3)    # 等待3秒,确保2个子线程都已经结束运算
      print("主线程执行完成后,number = ", number)
      
    • 分析过程:结果并不等于2,000,000,可以很明显地看出脏数据的情况。这是因为两个线程在运行过程中,CPU随机调度,你算一会我算一会,在没有对number进行保护的情况下,就发生了数据错误。如果想获得正确结果,可以使用join()方法,让多线程变成顺序执行,如下修改代码片段
      import threading
      import time
      
      number = 0
      
      def plus():
          global number    # global声明此处的number是外面的全局变量number
          for _ in range(1000000):    # 进行一个大数级别的循环加一运算
              number += 1
          print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
      
      for i in range(2):   # 用2个子线程,就可以观察到脏数据
          t = threading.Thread(target=plus)
          t.start()
          t.join()   # 添加这一行就让两个子线程变成了顺序执行!!!!!
      
      time.sleep(3)   # 等待3秒,确保2个子线程都已经结束运算
      print("主线程执行完成后,number = ", number)
      
    • 上面为了防止脏数据而使用join()的方法,其实是让多线程变成了单线程,属于因噎废食的做法,正确的做法是使用线程锁。Python在threading模块中定义了几种线程锁类,分别是:
      • Lock 互斥锁
      • RLock 可重入锁
      • Semaphore 信号
      • Event 事件
      • Condition 条件
      • Barrier “阻碍”

    3.1 互斥锁

    • 互斥锁是一种独占锁,同一时刻只有一个线程可以访问共享的数据。使用很简单,初始化锁对象,然后将锁当做参数传递给任务函数,在任务中加锁,使用后释放锁。
      import threading
      import time
      
      number = 0
      
      lock = threading.Lock()  # 锁对象!
      
      def plus(lk):
          global number        # global声明此处的number是外面的全局变量number
          lk.acquire()  # 开始加锁!!!
          for _ in range(1000000):   # 进行一个大数级别的循环加一运算
              number += 1
          print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
          lk.release()   # 释放锁,让别的线程也可以访问number!!!
          
          
      for i in range(2):   # 用2个子线程,就可以观察到脏数据
          t = threading.Thread(target=plus, args=(lock,))
          t.start()
      
      time.sleep(3)    # 等待3秒,确保2个子线程都已经结束运算
      print("主线程执行完成后,number = ", number)
      
    • RLock的使用方法和Lock一模一样,只不过它支持重入锁。该锁对象内部维护着一个Lock和一个counter对象。counter对象记录了acquire的次数,使得资源可以被多次require。最后,当所有RLock被release后,其他线程才能获取资源。在同一个线程中,RLock.acquire()可以被多次调用,利用该特性,可以解决部分死锁问题。

    3.2 信号Semaphore

    • 类名:BoundedSemaphore。这种锁允许一定数量的线程同时更改数据,它不是互斥锁。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。
      import time
      import threading
      
      def run(n, se):
          se.acquire()
          print("run the thread: %s" % n)
          time.sleep(1)
          se.release()
      
      # 设置5个线程允许同时运行
      semaphore = threading.BoundedSemaphore(5)
      for i in range(20):
          t = threading.Thread(target=run, args=(i, semaphore))
          t.start()
      

    3.3 事件Event

    • 类名Event, 事件线程锁的运行机制:全局定义了一个Flag,如果Flag的值为False,那么当程序执行wait()方法时就会阻塞,如果Flag值为True,线程不再阻塞。这种锁,类似交通红绿灯(默认是红灯),它属于在红灯的时候一次性阻挡所有线程,在绿灯的时候,一次性放行所有排队中的线程。事件主要提供了四个方法set()、wait()、clear()和is_set()
      • clear()方法会将事件的Flag设置为False
      • set()方法会将Flag设置为True
      • wait()方法将等待“红绿灯”信号
      • is_set():判断当前是否"绿灯放行"状态
    • 下面是一个模拟红绿灯,然后汽车通行的例子:
      import threading
      import time
      
      event = threading.Event()
      
      def lighter():
          green_time = 5  # 绿灯时间
          red_time = 5   # 红灯时间
          event.set()   # 初始设为绿灯
          while True:
              print("绿灯亮...")
              time.sleep(green_time)
              event.clear()
              print("红灯亮...")
              time.sleep(red_time)
              event.set()
      
      def run(name):
          while True:
              if event.is_set():    # 判断当前是否"放行"状态
                  print("一辆[%s] 呼啸开过..." % name)
                  time.sleep(1)
              else:
                  print("一辆[%s]开来,看到红灯,无奈的停下了..." % name)
                  event.wait()
                  print("[%s] 看到绿灯亮了,瞬间飞起....." % name)
      
      lighter = threading.Thread(target=lighter,)
      lighter.start()
      
      for name in ['奔驰', '宝马', '奥迪']:
          car = threading.Thread(target=run, args=(name,))
          car.start()
      

    3.4 条件Condition

    • 类名:Condition。Condition称作条件锁,依然是通过acquire()/release()加锁解锁。
      • wait([timeout])方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
      • notify()方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池),其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
      • notifyAll()方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
      import threading
      import time
      
      num = 0
      con = threading.Condition()
      
      class Foo(threading.Thread):
      
          def __init__(self, name, action):
              super(Foo, self).__init__()
              self.name = name
              self.action = action
      
          def run(self):
              global num
              con.acquire()
              print("%s开始执行..." % self.name)
              while True:
                  if self.action == "add":
                      num += 1
                  elif self.action == 'reduce':
                      num -= 1
                  else:
                      exit(1)
                  print("num当前为:", num)
                  time.sleep(1)
                  if num == 5 or num == 0:
                      print("暂停执行%s!" % self.name)
                      con.notify()
                      con.wait()
                      print("%s开始执行..." % self.name)
              con.release()
      
      if __name__ == '__main__':
          a = Foo("线程A", 'add')
          b = Foo("线程B", 'reduce')
          a.start()
          b.start()
      

    3.5 定时器Timer

    • 定时器Timer类是threading模块中的一个小工具,用于指定n秒后执行某操作。一个简单但很实用的东西。
      from threading import Timer
      
      def hello():
          print("hello world")
      
      # 表示1s后执行hello函数
      t = Timer(1, hello)
      t.start()
      

    3.6 通过with语句使用线程锁

    • 所有的线程锁都有一个加锁和释放锁的动作,非常类似文件的打开和关闭。在加锁后,如果线程执行过程中出现异常或者错误,没有正常的释放锁,那么其他的线程会造到致命性的影响。通过with上下文管理器,可以确保锁被正常释放。其格式如下:
      with some_lock:
         # 执行任务.... 
      
    • 这相当于:
      ome_lock.acquire()
      try:
          # 执行任务..
      finally:
          some_lock.release()
      

    4. 全局解释器锁(GIL)

    • 在大多数环境中,单核CPU情况下,本质上某一时刻只能有一个线程被执行。多核CPU时,则可以支持多个线程同时执行。但是在Python中,无论CPU有多少核,同时只能执行一个线程,这是由于GIL的存在导致的
    • GIL的全称是Global Interpreter Lock(全局解释器锁),是Python设计之初为了数据安全所做的决定。Python中的某个线程想要执行,必须先拿到GIL。可以把GIL看作是执行任务的“通行证”,并且在一个Python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。GIL只在CPython解释器中才有,因为CPython调用的是c语言的原生线程,不能直接操作cpu,只能利用GIL保证同一时间只能有一个线程拿到数据。在PyPy和JPython中没有GIL
    • Python多线程的工作流程:
      • a.拿到公共数据
      • b.申请GIL
      • c.Python解释器调用操作系统原生线程
      • d.CPU执行运算
      • e.当该线程执行一段时间消耗完,无论任务是否已经执行完毕,都会释放GIL
      • f.下一个被CPU调度的线程重复上面的过程
    • Python针对不同类型的任务,多线程执行效率是不同的:
      • 对于CPU密集型任务(各种循环处理、计算等等): 由于计算工作多,ticks计数很快就会达到阈值,然后触发GIL的释放与再竞争(多个线程来回切换是需要消耗资源的),所以Python下的多线程对CPU密集型任务并不友好
      • IO密集型任务(文件处理、网络通信等涉及数据读写的操作): 多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以Python的多线程对IO密集型任务比较友好。
      • 实际中使用的建议:Python中想要充分利用多核CPU,就用多进程。因为每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行。在Python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)同时建议在IO密集型任务中使用多线程,在计算密集型任务中使用多进程。另外,深入研究Python的协程机制,你会有惊喜的。

    5.博客原文

    相关文章

      网友评论

        本文标题:Python3多线程threading介绍(转载)

        本文链接:https://www.haomeiwen.com/subject/ntkrsctx.html