美文网首页
Python基础学习之四多线程

Python基础学习之四多线程

作者: 萧十一郎456 | 来源:发表于2019-02-21 12:28 被阅读0次

    1.线程的基本概念

    1.1 线程

    线程是应用程序最小的执行单元,线程与进程类似,进程可以看做程序的一次执行,而线程就是这个程序的各个功能,比如打开修图软件,就是一个进程,而修图软件的滤镜、虚化等功能可以看做线程。一个进程内部可以有一个到多个线程。所有的线程运行在一个进程中,共享一个内部环境,所以线程时间可以共享数据

    2.threading

    Python提供多线程编程的模块有thread和threading。thread模块提供了基本的线程和锁的支持,而threading模块提供了更高级别,功能更强的线程管理的功能。不建议使用低级别的thread模块,更高级别的threading更为先进,对线程的支持更为完善。而且thread对于你的进程什么时候应该结束完全没有控制,当主线程结束时,所有的线程都会被强制结束掉,没有警告也不会有正常的清除工作。

    2.1 threading模块中的函数和类

    函数有下:

    • active_count():返回当前运行的线程对象的数目

    • current_thread():返回当前Thread对象,对应的调用者的线程控制

    • enumerate():返回当前运行的线程对象的列表

    • main_thread():返回主要线程,一般情况下,主要线程是从Python解释器开始的线程

    类:

    • Thread:表示运行在单独线程控制中的一个活动,一个线程的执行对象。

    • Lock:锁原语对象,实现原始锁对象的类。一旦线程已经获得锁定,则随后尝试获取锁定,直到它被释放; 任何线程都可能会释放它。

    • RLock: 可重入锁是同步原语,可以由同一个线程多次获取。一旦线程获得了可重入锁,同一个线程可能会再次获取锁定; 每次线程必须释放它一次。

    • Condition: 该类实现条件变量对象。条件变量允许一个或多个线程等待,直到被另一个线程通知。

    • Event: 这是线程之间通信的最简单的机制之一,一个线程发出一个事件,其他线程等待它

    • Semaphore:一个信号量管理一个内部计数器,它由每个acquire()调用递减,并由每个调用递增release()。计数器永远不会低于零;当acquire() 发现它为零时,它阻塞,等待直到其他一些线程调用 release()。

    • Timer:这个类表示一个动作,只有经过一定的时间后才能运行。

    2.2 threading.Thread

    Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

    group:应为None

    target:被run()方法调用的可调用对象,可以传入函数等可调用对象

    name:线程名

    args:传入到target的参数元组

    kwargs:传入都target的参数字典

    使用Thread两种方法,一种是创建Thread实例,调用start()方法;另一种是继承Thread类,在子类中重写run()和init()方法。

    import time

    import threading

    def hello_thread(name):

     print('Starting {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))

     time.sleep(3)

     print('End {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))

    if __name__ == '__main__':

     print('Satring {}, Time: {}'.format(threading.current_thread().name, time.ctime()))

     nums = ['One', 'Two', 'Three', 'Four', 'Five']

     threads = []

     for n in nums:

     t = threading.Thread(target=hello_thread, args=(n,))

     threads.append(t)

     for th in threads:

     th.start()

     for th in threads:

     th.join()

     print('End {}, Time: {}'.format(threading.current_thread().name, time.ctime()))

    Satring MainThread, Time: Sun Sep 3 11:50:30 2017

    Starting Thread-4--->One, Time: Sun Sep 3 11:50:30 2017

    Starting Thread-5--->Two, Time: Sun Sep 3 11:50:30 2017Starting Thread-6--->Three, Time: Sun Sep 3 11:50:30 2017

    Starting Thread-7--->Four, Time: Sun Sep 3 11:50:30 2017

    Starting Thread-8--->Five, Time: Sun Sep 3 11:50:30 2017

    End Thread-8--->Five, Time: Sun Sep 3 11:50:33 2017End Thread-6--->Three, Time: Sun Sep 3 11:50:33 2017

    End Thread-7--->Four, Time: Sun Sep 3 11:50:33 2017End Thread-4--->One, Time: Sun Sep 3 11:50:33 2017End Thread-5--->Two,Time: Sun Sep 3 11:50:33 2017End MainThread, Time: Sun Sep 3 11:50:33 2017

    输出结果混在了一起,因为标准输出是共享资源,造成混乱,所以需要加锁。

    import time

    import threading

    th_lock = threading.Lock()

    def hello_thread(name):

     # 获取锁

     th_lock.acquire()

     print('Starting {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))

     time.sleep(3)

     print('End {}--->{}, Time: {}'.format(threading.current_thread().name, name, time.ctime()))

     # 释放锁

     th_lock.release()

    if __name__ == '__main__':

     print('Satring {}, Time: {}'.format(threading.current_thread().name, time.ctime()))

     nums = ['One', 'Two', 'Three', 'Four', 'Five']

     threads = []

     for n in nums:

     t = threading.Thread(target=hello_thread, args=(n,))

     threads.append(t)

     for th in threads:

     th.start()

     for th in threads:

     th.join()

     print('End {}, Time: {}'.format(threading.current_thread().name, time.ctime()))

    Satring MainThread, Time: Sun Sep 3 15:24:45 2017Starting Thread-4--->One, Time: Sun Sep 3 15:24:45 2017

    End Thread-4--->One, Time: Sun Sep 3 15:24:48 2017Starting Thread-5--->Two, Time: Sun Sep 3 15:24:48 2017

    End Thread-5--->Two, Time: Sun Sep 3 15:24:51 2017

    Starting Thread-6--->Three, Time: Sun Sep 3 15:24:51 2017

    End Thread-6--->Three, Time: Sun Sep 3 15:24:54 2017

    Starting Thread-7--->Four, Time: Sun Sep 3 15:24:54 2017

    End Thread-7--->Four, Time: Sun Sep 3 15:24:57 2017

    Starting Thread-8--->Five, Time: Sun Sep 3 15:24:57 2017

    End Thread-8--->Five, Time: Sun Sep 3 15:25:00 2017End MainThread, Time: Sun Sep 3 15:25:00 2017

    一个线程结束后,马上开始新的线程。

    继承Thread.Threading

    import threading

    from time import time, sleep

    class MyThreading(threading.Thread):

     def __init__(self, thread_id, thread_name):

     threading.Thread.__init__(self)

     self.thread_id = thread_id

     self.thread_name = thread_name

     def run(self):

     print('Thread {} , Name {}, Start'.format(self.thread_name, self.thread_id))

     sleep(1)

     print('Thread End')

    if __name__ == '__main__':

     print('Begining')

     t1 = MyThreading(1, 'Threading-1')

     t2 = MyThreading(2, 'Threading-2')

     t1.start()

     t2.start()

     t1.join()

     t2.join()

     print('All Done!')

    Begining

    Thread Threading-1 , Name 1, Start

    Thread Threading-2 , Name 2, Start

    Thread EndThread End

    All Done!

    外部传入线程运行的函数

    import time 

    import threading

    loops = ['one', 'two']

    class MyThread(threading.Thread):

     def __init__(self, target, args):

     super(MyThread, self).__init__()

     self.target = target

     self.args = args

     def run(self):

     self.target(*self.args)

    def output(nloop, nesc):

     print('Start loop, "{}", at: {}'.format(nloop, time.ctime()))

     time.sleep(nesc)

     print('End loop, "{}", at: {}'.format(nloop, time.ctime()))

    if __name__ == '__main__':

     print('Main Threading')

     nloop = range(len(loops))

     threads = []

     for i in nloop:

     my_thread = MyThread(output, (loops[i], i))

     threads.append(my_thread)

     for th in threads:        

     th.start()

     for th in threads:

     th.join()

     print('All Done')

    Main ThreadingStart loop, "one", at: Sun Sep 3 16:54:43 2017

    End loop, "one", at: Sun Sep 3 16:54:43 2017

    Start loop, "two", at: Sun Sep 3 16:54:43 2017

    End loop, "two", at: Sun Sep 3 16:54:44 2017

    All Done

    创建线程的时候传入一个类,这样可以使用类的强大功能,可以保存更多的信息,方法更灵活。

    from threading import Thread

    from time import sleep, ctime

    loops = [4, 2]

    class ThreadFunc(object):

     def __init__(self, func, args, name=""):

     self.name = name

     self.func = func

     self.args = args

     def __call__(self):

     # 创建新线程的时候,Thread 对象会调用我们的 ThreadFunc 对象,这时会用到一个特殊函数 __call__()。

     self.func(*self.args)

    def loop(nloop, nsec):

     print('start loop %s at: %s' % (nloop, ctime()))

     sleep(nsec)

     print('loop %s done at: %s' % (nloop, ctime()))

    def main():

     print('starting at:', ctime())

     threads = []

     nloops = range(len(loops))

     for i in nloops:

     t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))

     threads.append(t)

     for i in nloops:

     threads[i].start()

     for i in nloops:

     threads[i].join() 

     print('all DONE at:', ctime())

    if __name__ == '__main__':

     main()

    starting at: Sun Sep 3 17:33:51 2017

    start loop 0 at: Sun Sep 3 17:33:51 2017

    start loop 1 at: Sun Sep 3 17:33:51 2017

    loop 1 done at: Sun Sep 3 17:33:53 2017

    loop 0 done at: Sun Sep 3 17:33:55 2017all DONE at:

     Sun Sep 3 17:33:55 2017

    总结:threading.Thread()类创建线程,实际上就像老师给学生分配任务一样,你做什么,他做什么,她做什么,我做什么。在Python中分配的任务以函数或者类的形式体现,所以创建多线程会给threading.Thread指定一个函数或者类,相当与指定任务,传入参数则相当与老师给你一些材料,用这些材料完成任务。因此,可以看到创建多线程时指定函数、指定类,有的还会继承threading.Thread,添加一些功能,再指定函数或者类。start()方法用来启动线程,start()告诉run()函数运行线程,所以继承threading.Thread时需要重写run()方法。join()方法用以阻塞当前线程,就是告诉当前线程,调用join()方法的线程不执行完,你就不能执行。

    2.3 Lock

    线程共享数据,因此多个线程对同一数据修改可能会发生冲突,因此需要Lock。当一个线程获取Lock时,相当于告诉其他线程,数据我正在修改,你不能动,等我释放之后,你才可以。

    import time, threading

    balance = 0

    def change_it(n):

     global balance

     balance = balance + n

     balance = balance - n

    def run_thread(n):

     for i in range(100000):

     change_it(n)

    t1 = threading.Thread(target=run_thread, args=(5,))

    t2 = threading.Thread(target=run_thread, args=(8,))

    t1.start()

    t2.start()

    t1.join()

    t2.join()

    print(balance)

    5

    多次执行后,会出现不为0的情况,因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。

    import time, threading

    balance = 0

    lock = threading.Lock()

    def change_it(n):

     global balance

     balance = balance + n

     balance = balance - n

    def run_thread(n):

     for i in range(100000):

     try:

     # 获取锁

     lock.acquire()

     change_it(n)

     finally:

     # 释放锁

     lock.release()

    t1 = threading.Thread(target=run_thread, args=(5,))

    t2 = threading.Thread(target=run_thread, args=(8,))

    t1.start()

    t2.start()

    t1.join()

    t2.join()

    print(balance)

    0

    2.4 Condition

    条件变量对象能让一个线程停下来,等待其他线程满足了某个条件。条件变量允许一个或多个线程等待,直到被另一个线程通知。线程首先acquire一个条件变量锁。如果条件不足,则该线程wait,如果满足就执行线程,甚至可以notify其他线程。其他处于wait状态的线程接到通知后会重新判断条件。

    1. 当一个线程获取锁后,发现没有相应的资源或状态,就会调用wait阻塞,释放已经获得的锁,直到期望的资源或者状态发生改变。

    2. 当一个线程获得了锁,改变了资源或者状态,就会调用notify()或者notifyall()去通知其他线程。

    方法:

    acquire():获得锁

    release

    ():释放锁

    wait

    ([timeout]):持续等待直到被notify()或者notifyAll()通知或者超时(必须先获得锁)

    wait

    ():所做操作, 先释放获得的锁, 然后阻塞, 知道被notify或者notifyAll唤醒或者超时, 一旦被唤醒或者超时, 会重新获取锁(应该说抢锁), 然后返回

    notify

    ():唤醒一个wait()阻塞的线程

    notify_all

    ()或者notifyAll():唤醒所有阻塞的线程

    from threading import Thread, current_thread, Condition

    from time import sleep

    con = Condition()

    def th_con():

     with con:

     for i in range(5):

     print('Name: {}, Times: {}'.format(current_thread().name, i))

     sleep(0.3)

     if i == 3:

     print('Release Lock, Wait')

     # 只有获取锁的线程才能调用 wait() 和 notify(),因此必须在锁释放前调用 

     con.wait()

    def th_con2():

     with con:

     for i in range(5):

     print('Name: {}, Times: {}'.format(current_thread().name, i))

     sleep(0.3)

     if i == 3:                

     con.notify()

     print('Notify Thread')

    if __name__ == '__main__':

     Thread(target=th_con, name='Thread>>>One').start()

     Thread(target=th_con2, name='Thread<<

    Name: Thread>>>One, Times: 0

    Name: Thread>>>One, Times: 1

    Name: Thread>>>One, Times: 2

    Name: Thread>>>One, Times: 3

    Release Lock, Wait

    Name: Thread<<

    Name: Thread<<

    Name: Thread<<

    Name: Thread<<

    Notify Thread

    Name: Thread<<

    Name: Thread>>>One, Times: 4

    2.5 Event

    事件用于在线程间通信。一个线程发出一个信号,其他一个或多个线程等待,调用event对象的wait方法,线程则会阻塞等待,直到别的线程set之后,才会被唤醒。

    import time

    import threading

    class MyThread(threading.Thread):

     def __init__(self, event):

     super(MyThread, self).__init__()

     self.event = event

     def run(self):

     print('Thread {} is ready'.format(self.getName()))

     self.event.wait()

     print('Thread {} run'.format(self.getName()))

    signal = threading.Event()

    def main():

     start = time.time()

     for i in range(3):

     t = MyThread(signal)

     t.start()

     time.sleep(3)

     print('After {}s'.format(time.time() - start))

     # 将内部标志设置为True,等待标识的其他线程都会被唤醒

     signal.set()

    if __name__ == '__main__':

     main()

    Thread Thread-4 is ready

    Thread Thread-5 is ready

    Thread Thread-6 is ready

    After 3.0065603256225586sThread Thread-4 run

    Thread Thread-6 run

    Thread Thread-5 run

    3.queue

    queue用于线程间通信,让各个线程之间共享数据。Queue实现的三种队列模型:

    • FIFO(先进先出)队列,第一加入队列的任务, 被第一个取出

    • LIFO(后进先出)队列,最后加入队列的任务, 被第一个取出

    • PriorityQueue(优先级)队列, 保持队列数据有序, 最小值被先取出

    queue实现的类和异常:

    qsize():返回队列的大致大小

    empty

    ():如果队列为空,则返回True

    full

    ():如果队列满,则返回True

    put

    ():往Queue加入元素

    get

    ():从Queue中删除并返回一个项目

    join

    ():阻塞一直到Queue中的所有元素被获取和处理

    task_done

    ():表明以前入队的任务已经完成。由队列消费者线程使用。对于每个get()用于获取任务的后续调用, task_done()告知队列对任务的处理完成。

    生产者和消费者模型

    某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。

    import time

    import threading

    import queue

    import random

    class Producer(threading.Thread):

     def __init__(self, name, q):

     threading.Thread.__init__(self, name=name)

     self.data = q

     def run(self):

     for i in range(10):

     elem = random.randrange(100)

     self.data.put(elem)

     print('{} a elem {}, Now the size is {}'.format(self.getName(), elem, self.data.qsize()))

     time.sleep(random.random())

     print('Thread {}, {} is finished!!!'.format(threading.current_thread().name, self.getName()))

    class Consumer(threading.Thread):

     def __init__(self, name, q):

     threading.Thread.__init__(self, name=name)

     self.data = q

     def run(self):

     for i in range(10):

     elem = self.data.get()

     self.data.task_done()

     print('{} a elem {}, Now the size is {}'.format(self.getName(), elem, self.data.qsize()))

     time.sleep(random.random())

     print('Thread {}, {} is finished!!!'.format(threading.current_thread().name, self.getName()))

    def main():

     print('Start Pro')

     q = queue.Queue()

     producer = Producer('Producer', q)

     consumer = Consumer('Consumer', q)

     producer.start()

     consumer.start()

     producer.join()

     consumer.join()

    #  threads_pro = []

    #  threads_con = []

    #  for i in range(3):

    #  producer = Producer('Producer', q)

    #  threads_pro.append(producer)

    #  for i in range(3):

    #  consumer = Consumer('Consumer', q)

    #  threads_con.append(consumer)

    #  for th in threads_pro:

    #  th.start()

    #  for th in threads_con:

    #  th.start()

    #  for th in threads_pro:

    #  th.join()

    #  for th in threads_con:

    #  th.join()

     print('All Done!!!')

    if __name__ == '__main__':

     main()

    Start Pro

    Producer a elem 89, Now the size is 1

    Consumer a elem 89, Now the size is 0

    Producer a elem 26, Now the size is 1Consumer a elem 26, Now the size is 0

    Producer a elem 51, Now the size is 1Consumer a elem 51, Now the size is 0

    Producer a elem 41, Now the size is 1Consumer a elem 41, Now the size is 0

    Producer a elem 29, Now the size is 1Consumer a elem 29, Now the size is 0

    Producer a elem 63, Now the size is 1

    Consumer a elem 63, Now the size is 0

    Producer a elem 56, Now the size is 1Consumer a elem 56, Now the size is 0

    Producer a elem 31, Now the size is 1

    Consumer a elem 31, Now the size is 0

    Producer a elem 21, Now the size is 1

    Consumer a elem 21, Now the size is 0

    Producer a elem 67, Now the size is 1

    Consumer a elem 67, Now the size is 0

    Thread Producer, Producer is finished!!!

    Thread Consumer, Consumer is finished!!!

    All Done!!!

    4.ThreadLocal

    一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。它本身是一个全局变量,但是每个线程却可以利用它来保存属于自己的私有数据,这些私有数据对其他线程也是不可见的。

    import threading

    # 创建全局ThreadLocal对象:

    local_school = threading.local()

    def process_student():

     # 获取当前线程关联的student:

     std = local_school.student

     print('Hello, %s (in %s)' % (std, threading.current_thread().name))

    def process_thread(name):

     # 绑定ThreadLocal的student:

     local_school.student = name

     process_student()

    t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')

    t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')

    t1.start()

    t2.start()

    t1.join()

    t2.join()

    Hello, Alice (in Thread-A)

    Hello, Bob (in Thread-B)

    相关文章

      网友评论

          本文标题:Python基础学习之四多线程

          本文链接:https://www.haomeiwen.com/subject/cudryqtx.html