美文网首页python
Python基础(十二)并发编程

Python基础(十二)并发编程

作者: 鹊南飞_ | 来源:发表于2019-10-24 18:41 被阅读0次

    1. 线程概述

    几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每一个运行中的程序就是一个进程。当一个程序进入内存运行时,即变成一个进程。进程是处于运行过程中的程序,并且具有一定的独立功能。进程是系统进行资源分配和调度的一个独立单位。
    一般而言,进程包含以下三个特征。

    • 独立性: 进程是系统中独立存在的实体,它可以拥有自己的独立的资源,每一个进程都拥有自己的私有的地址空间。在没有经过进程本身允许的情况下,一个用户进程不可以直接访问其他进程的地址空间。
    • 动态性:进程与程序的区别在于,程序只是一个静态的指令集合,而进程是一个正在系统中活动的指令集合。在进程中加入了时间的概念。进程具有自己的生命周期和各种不同的状态,在程序中是没有这些概念的。
    • 并发性:多个进程可以在单个处理器上并发执行,多个进程之间不会互相影响。

    并发和并行是两个概念,并行指在同一个时刻有多条指令在多个处理器上同时执行;并发指在同一时刻只能有一条指令执行,但多个进程指令被快速轮换执行,使得宏观上具有多个进程同时执行的效果。

    操作系统可以同时执行多个任务,每一个任务就是一个进程;进程可以同时执行多个任务,每一个任务就是一个线程。
    使用多线程编程具有以下几个优点:

    • 进程之间不能共享内存,线程之间共享内存非常容易。
    • 使用多线程来实现多任务并发执行比使用多进程的效率高
    • Python语言内置了多线程功能支持

    2. 线程的创建和启动

    threading模块

    • 使用threading模块的Thread类的构造器创建线程
    • 继承threading模块的Thread类创建线程类

    1. 使用threading模块的Thread类的构造器创建线程

    构造器如下

    __init__(self, group=None,target=None,name=None.args=().kwargs=None, * , daemon=None)
    
    构造器参数

    步骤如下

    • 使用构造器创建线程对象
    • 调用start()方法启动该线程

    示例代码如下:

    import threading
    
    
    def dog(num):
        print(num)
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=dog, args=(10,))
        t1.start()
        t2 = threading.Thread(target=dog, args=(20,))
        t2.start()
    
    

    • 在进行多线程编程时,不要忘记Python程序运行时默认的主线程。
    • 多线程就是让多个函数并发执行,让普通用户感觉到多个函数似乎同时在执行。

    2. 继承threading模块的Thread类创建线程类

    步骤如下:

    • 定义Thread类的子类,重写该类的run()方法。
    • 创建Thread子类的示例,即创建线程对象。
    • 调用线程对象的start()方法来启动线程。

    示例代码如下:

    import threading
    
    
    class Dog(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):
            print(self.num)
    
    
    if __name__ == '__main__':
        t1 = Dog(10)
        t1.start()
        t2 = Dog(20)
        t2.start()
    
    

    推荐使用第一种方式来创建进程,因为编程简单,同时具有更清晰的逻辑结构

    3. 线程的生命周期

    • 新建(New)
    • 就绪(Ready)
    • 运行(Running)
    • 堵塞(Blocked)
    • 死亡(Dead)

    1. 新建和就绪状态

    创建一个Thread对象或Thread子类的对象之后,该线程就处于新建状态。
    当线程对象调用了start()方法后,该线程就处于就绪状态。


    注意
    注意

    2. 运行和堵塞状态

    当一个线程开始运行后,它不可能一直处于运行状态(除非它的线程执行体足够短,瞬间就结束了),线程在运行过程中需要被中断,目的是使其他线程获得执行的机会,线程调度的细节取决于底层平台所采用的策略。
    进行阻塞状态的情况:

    • 线程调用了sleep()方法主动放弃所占用的处理器资源
    • 线程调用了一个阻塞式I/O方法,在该方法返回之前,该线程被阻塞
    • 线程试图获得一个锁对象,但该锁对象正被其他线程所持有
    • 线程在等待某个通知

    解除阻塞,重新进入就绪状态额情况

    • 调用sleep()方法的线程经过了阻塞的时间
    • 线程调用的阻塞式I/O方法已经返回
    • 线程成功地获得了试图获取的锁对象
    • 线程正在等待某个通知时,其他线程发出了一个通知
    线程状态转换图

    3. 线程死亡

    线程处于死亡状态:

    • run()方法或代表线程执行体的target函数执行完成,线程正常结束
    • 线程抛出一个未捕获的Exception或Error


      注意
      注意
      注意

    4. 控制线程

    1. join线程

    Thread提供了让一个线程等待另一个线程完成的方法join()方法。
    当某个程序执行六中调用其他线程的join()方法时,调用线程将被阻塞,直到join方法加入的join线程执行完成。
    join(timeout=None)方法可以指定一个timeout参数,该参数指定等待被join线程的时间最长为timeout秒。如果在timeout秒内被join的线程还没有执行结束,则不再等待。

    import threading
    
    
    def dog(num):
        for i in range(num):
            print('{} {}'.format(threading.current_thread().name, i))
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=dog, args=(5,))
        t1.start()
        t1.join()
        t2 = threading.Thread(target=dog, args=(10,))
        t2.start()
        t2.join()
        print('执行结束')
    

    加入join后,t2只会在t1执行后才开始运行,运行结果为

    Thread-1 0
    Thread-1 1
    Thread-1 2
    Thread-1 3
    Thread-1 4
    Thread-2 0
    Thread-2 1
    Thread-2 2
    Thread-2 3
    Thread-2 4
    Thread-2 5
    Thread-2 6
    Thread-2 7
    Thread-2 8
    Thread-2 9
    执行结束
    

    去掉join之后

    import threading
    
    
    def dog(num):
        for i in range(num):
            print('{} {}'.format(threading.current_thread().name, i))
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=dog, args=(5,))
        t1.start()
        # t1.join()
        t2 = threading.Thread(target=dog, args=(10,))
        t2.start()
        # t2.join()
        print('执行结束')
    

    去掉join之后,运行结果

    Thread-1 0
    Thread-1 1
    Thread-1 2
    Thread-1 3
    Thread-1 4
    Thread-2 0
    Thread-2 1
    执行结束
    Thread-2 2
    Thread-2 3
    Thread-2 4
    Thread-2 5
    Thread-2 6
    Thread-2 7
    Thread-2 8
    Thread-2 9
    

    2. 后台线程

    有一种线程,它是后台运行的,它的任务是为其他线程提供服务,这种线程被称为“后台线程”(Daemon Thread),又称为“守护线程”或“精灵线程”。Python解释器的垃圾回收线程就是典型的后台线程。
    后台线程有一个特征:如果所有的前台线程都死亡了,那么后台线程就会自动死亡。
    调用Thread对象的daemon属性可以将指定线程设置成后台线程。

    import threading
    
    
    def dog(num):
        for i in range(num):
            print('{} {}'.format(threading.current_thread().name, i))
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=dog, args=(1000,))
        # 将t1设置为后台线程,然后启动该线程
        # 该线程本应该运行到第1000次循环,但无法运行到1000
        # 因为程序中的主线程也就是唯一的前台线程结束后,程序会主动退出,后台线程也会被结束了
        t1.daemon = True
        t1.start()
        t2 = threading.Thread(target=dog, args=(10,))
        t2.start()
        print('执行结束')
    

    运行结果

    # 省略之前的结果
    Thread-2 4
    Thread-2 5
    Thread-2 6
    Thread-2 7
    Thread-1 29
    Thread-1 30
    执行结束
    Thread-2 8
    Thread-1 31
    Thread-2 9
    

    创建后台线程有两种方式:

    • 主动将线程的daemon属性设置为True
    • 后台线程启动的线程默认是后台线程
    注意

    3. 线程睡眠

    如果需要让当前正在执行的线程暂停一段时间,并进入阻塞状态,可以调用time模块的sleep(secs)函数来实现,该函数可以指定一个secs参数,用于指定线程阻塞多少秒

    import time
    
    for i in range(10):
        local_time = time.localtime(time.time())
        print(time.strftime("%Y-%m-%d %H:%M:%S", local_time))
        time.sleep(2)
    

    可以看出每隔两秒,就输出一条记录

    5. 线程同步

    1. 线程安全问题

    经典的问题:银行取钱问题


    银行取钱问题步骤
    import threading
    import time
    
    
    # 账户类
    class Account:
        def __init__(self, account_no, balance):
            # 账户编号和账户余额
            self.account_no = account_no
            self.balance = balance
    
    
    # 取钱操作
    def draw(account, draw_amount):
        # 账户余额大于取钱数
        if account.balance >= draw_amount:
            time.sleep(0.001)
            account.balance -= draw_amount
            print('{}取钱成功,余额为{}'.format(threading.current_thread().name, account.balance))
        else:
            print('取钱失败,余额不足')
    
    
    acct = Account('abc', 1000)
    t1 = threading.Thread(name='小红', target=draw, args=(acct, 800))
    t2 = threading.Thread(name='小明', target=draw, args=(acct, 800))
    t1.start()
    t2.start()
    

    输出结果

    小明取钱成功,余额为200
    小红取钱成功,余额为-600
    

    虽然是程序中人为使用time.sleep(0.001)来强制线程调度切换,但这种切换也是完全可能发生的。

    2. 同步锁

    Python的threading模块引入了锁(lock),提供了Lock和RLock两个类


    同步锁
    通用Rlock的代码格式

    修改以上代码

    import threading
    import time
    
    
    # 账户类
    class Account:
        def __init__(self, account_no, balance):
            # 账户编号和账户余额
            self.account_no = account_no
            self._balance = balance
            # 增加锁
            self.lock = threading.RLock()
    
        def get_balance(self):
            return self._balance
    
        # 取钱操作
        def draw(self, draw_amount):
            # 加锁
            self.lock.acquire()
            try:
                # 账户余额大于取钱数
                if self._balance >= draw_amount:
                    time.sleep(0.001)
                    self._balance -= draw_amount
                    print('{}取钱成功,余额为{}'.format(threading.current_thread().name, self._balance))
                else:
                    print('{}取钱失败,余额不足'.format(threading.current_thread().name))
            finally:
                self.lock.release()
    
    
    # 取钱操作
    def draw(account, draw_amount):
        account.draw(draw_amount)
    
    
    acct = Account('abc', 1000)
    t1 = threading.Thread(name='小红', target=draw, args=(acct, 800))
    t2 = threading.Thread(name='小明', target=draw, args=(acct, 800))
    t1.start()
    t2.start()
    

    输出结果

    小红取钱成功,余额为200
    小明取钱失败,余额不足
    

    安全访问逻辑: 加锁,修改, 释放锁


    提示

    可变类的线程是以降低程序的运行效率作为代价的,为了减少线程安全所带来的的负面影响,程序可以采用如下策略:

    • 不要对线程安全类的所有方法都进行永不,只对那些会改变竞争资源(共享资源)的方法进行同步。(如上述例子中的account_no实例变量就无需同步)
    • 如果可变类有两种运行环境:单线程环境和多线程环境,则应该为该可变类提供两种版本,即线程不安全版本和线程安全版本,在单线程环境中使用线程不安全版本以保证性能,在多线程环境中使用线程安全版本

    3. 死锁

    当两个线程相互等待对方释放同步监视器时就会出现死锁。
    Python解释器没有检测,也没有采取措施来处理死锁情况,所以在进行多线程编程时应该采取措施来避免死锁。一旦出现死锁,整个程序既不会发生任何异常,也不会给出任何提示,只是所有线程都处于堵塞状态,无法继续。

    避免死锁的常见方式:

    • 避免多次锁定: 尽量避免用一个线程对多个Lock进行锁定。
    • 具用相同加锁顺序:如果多个线程需要对多个Lock进行锁定,则应该保证它们以相同的顺序请求加锁。
    • 使用定时锁: acquire()方法加锁时,可以指定timeout参数
    • 死锁检测:死锁检测是一种依靠算法机制来实现的死锁预防机制,它主要是针对那些不可使用按序加锁和定时锁的场景的。

    6. 线程通信

    1. 使用Condition实现线程通信

    Condition
    import threading
    import time
    
    
    # 账户类
    class Account:
        def __init__(self, account_no, balance):
            # 账户编号和账户余额
            self.account_no = account_no
            self._balance = balance
            self.cond = threading.Condition()
            # 是否存钱的旗标
            self._flag = False
    
        def get_balance(self):
            return self._balance
    
        # 取钱操作
        def draw(self, draw_amount):
            # 加锁
            self.cond.acquire()
            try:
                # sele._flag为True时,等待存钱,取钱方法被堵塞
                if not self._flag:
                    self.cond.wait()
                else:
                    self._balance -= draw_amount
                    print('{}取钱成功,余额为{}'.format(threading.current_thread().name, self._balance))
                    self._flag = False
                    # 唤醒其他线程
                    self.cond.notify_all()
            finally:
                self.cond.release()
    
        # 存钱操作
        def deposit(self, deposit_amount):
            # 加锁
            self.cond.acquire()
            try:
                # sele._flag为False时,等待取钱,存钱方法被堵塞
                if self._flag:
                    self.cond.wait()
                else:
                    self._balance += deposit_amount
                    print('{}存钱成功,余额为{}'.format(threading.current_thread().name, self._balance))
                    self._flag = True
                    # 唤醒其他线程
                    self.cond.notify_all()
            finally:
                self.cond.release()
    
    
    # 取钱操作
    def draw_many(account, draw_amount, max):
        for i in range(max):
            account.draw(draw_amount)
    
    
    # 存钱操作
    def deposit_many(account, deposit_amount, max):
        for i in range(max):
            account.deposit(deposit_amount)
    
    
    acct = Account('abc', 0)
    t1 = threading.Thread(name='取钱A', target=draw_many, args=(acct, 800, 100))
    t2 = threading.Thread(name='存钱B', target=deposit_many, args=(acct, 800, 100))
    t3 = threading.Thread(name='存钱C', target=deposit_many, args=(acct, 800, 100))
    t4 = threading.Thread(name='存钱D', target=deposit_many, args=(acct, 800, 100))
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    

    运行结果

    存钱B存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱C存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱C存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱D存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱D存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱C存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱B存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱B存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱D存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱B存钱成功,余额为800
    取钱A取钱成功,余额为0
    存钱C存钱成功,余额为800
    # 省略之后的结果
    

    2. 使用队列Queue控制线程通信

    三个队列类
    三个队列类的属性和方法
    import queue
    
    # 定义一个长度为2的队列
    q = queue.Queue(2)
    q.put('1')
    print('111')
    q.put('2')
    print('222')
    q.put(3)
    print('333')
    

    结果

    111
    222
    

    队列的长度为2, 使用put()放入两个元素时,顺利执行。放入第三个元素时,会堵塞进程。
    当然,当Queue已空的情况下,使用get()方法也会堵塞进程

    import queue
    import threading
    import time
    
    
    def product(q):
        fruit = ['apple', 'pear', 'banana']
        for i in range(10000):
            time.sleep(0.2)
            # 尝试放入元素,如果队列已满,则会被阻塞
            q.put(fruit[i % 3])
            print('{}生产元组元素'.format(threading.current_thread().name))
    
    
    def consumer(q):
        while True:
            time.sleep(0.2)
            # 尝试取出元素,如果队列已空,则会被阻塞
            t = q.get()
            print('{}消费元素{}'.format(threading.current_thread().name, t))
    
    
    # 创建容量为3的队列
    q = queue.Queue(3)
    # 启动三个生产者线程,一个消费者线程
    t1 = threading.Thread(target=product, args=(q,))
    t2 = threading.Thread(target=product, args=(q,))
    t3 = threading.Thread(target=product, args=(q,))
    t4 = threading.Thread(target=consumer, args=(q,))
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    
    

    输出结果

    Thread-3生产元组元素
    Thread-4消费元素apple
    Thread-1生产元组元素
    Thread-2生产元组元素
    Thread-4消费元素apple
    Thread-2生产元组元素
    Thread-3生产元组元素
    Thread-4消费元素apple
    Thread-1生产元组元素
    Thread-4消费元素pear
    Thread-1生产元组元素
    Thread-4消费元素pear
    Thread-1生产元组元素
    Thread-4消费元素pear
    Thread-1生产元组元素
    # 省略之后的结果
    

    3. 使用Event控制线程通信

    Event是一种非常简单的线程通信机制:一个线程发出一个Event,另一个线程可以通过该Event被触发
    Event提供了以下方法

    • is_set():该方法返回Event的内部旗标是否未True。
    • set(): 该方法会把Event的内部旗标设置为True,并唤醒所有处于等待状态的线程。
    • clear():改方法会把Event的内部旗标设置为False,通常会调用wait()方法来阻塞当前线程。
    • wait(timeout=None):该方法会阻塞当前线程
    import threading
    import time
    
    event = threading.Event()
    
    
    def cal(name):
        print('{}进入阻塞状态'.format(name))
        # 进入阻塞状态
        event.wait()
        print('{}进入运行状态'.format(name))
    
    
    t1 = threading.Thread(target=cal, args=('a',))
    t2 = threading.Thread(target=cal, args=('b',))
    t1.start()
    t2.start()
    time.sleep(2)
    print('-----------')
    event.set()
    

    输出结果

    a进入阻塞状态
    b进入阻塞状态
    -----------
    a进入运行状态
    b进入运行状态
    
    注意

    7. 线程池

    系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好的提供性能。
    线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
    使用线程池还可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降。

    1. 使用线程池

    线程池的基类是concurrent.futures模块中的Executor,Executor有两个子类,其中ProcessPoolExecutor用于创建进程池,ThreadPoolExecutor用于创建线程池。


    Executor
    Future
    线程池执行步骤
    from concurrent.futures import ThreadPoolExecutor
    import threading
    import time
    
    
    def action(max):
        for i in range(max):
            print(threading.current_thread().name + " " + str(i))
    
    
    # 创建一个包含两个线程的线程池
    pool = ThreadPoolExecutor(max_workers=2)
    # 向线程池提交两个任务
    future1 = pool.submit(action, 10)
    future2 = pool.submit(action, 20)
    # 判断任务1是否结束
    print(future1.done())
    time.sleep(3)
    # 判断任务2是否结束
    print(future2.done())
    # 查看返回结果
    print(future1.result())
    print(future2.result())
    # 关闭线程池
    pool.shutdown()
    
    

    输出结果

    ThreadPoolExecutor-0_0 0
    ThreadPoolExecutor-0_0 1
    ThreadPoolExecutor-0_0 2
    ThreadPoolExecutor-0_0 3
    ThreadPoolExecutor-0_0 4
    ThreadPoolExecutor-0_0 5
    ThreadPoolExecutor-0_0 6
    ThreadPoolExecutor-0_0 7
    ThreadPoolExecutor-0_0 8
    ThreadPoolExecutor-0_0 9
    ThreadPoolExecutor-0_0 0
    ThreadPoolExecutor-0_0 1
    ThreadPoolExecutor-0_0 2
    ThreadPoolExecutor-0_0 3
    ThreadPoolExecutor-0_0 4
    ThreadPoolExecutor-0_0 5
    ThreadPoolExecutor-0_0 6
    ThreadPoolExecutor-0_0 7
    ThreadPoolExecutor-0_0 8
    True
    ThreadPoolExecutor-0_0 9
    ThreadPoolExecutor-0_0 10
    ThreadPoolExecutor-0_0 11
    ThreadPoolExecutor-0_0 12
    ThreadPoolExecutor-0_0 13
    ThreadPoolExecutor-0_0 14
    ThreadPoolExecutor-0_0 15
    ThreadPoolExecutor-0_0 16
    ThreadPoolExecutor-0_0 17
    ThreadPoolExecutor-0_0 18
    ThreadPoolExecutor-0_0 19
    True
    None
    None
    

    2. 获取执行结果

    可以用Future的result()来获取返回值,当该方法会阻塞当前主线程。
    可以通过Future的add_done_callback()方法来添加回调函数。当线程完成后,程序会自动触发该回调函数,并将对应的Future对象作为参数传给该回调函数。

    from concurrent.futures import ThreadPoolExecutor
    import threading
    import time
    
    
    def action(max):
        for i in range(max):
            print(threading.current_thread().name + " " + str(i))
        time.sleep(0.5)
        return max
    
    
    def get_result(future):
        print(future.result())
    
    
    # 创建一个包含两个线程的线程池
    pool = ThreadPoolExecutor(max_workers=2)
    # 向线程池提交两个任务
    future1 = pool.submit(action, 10)
    future2 = pool.submit(action, 20)
    # 查看返回结果
    future1.add_done_callback(get_result)
    future2.add_done_callback(get_result)
    print('-----------')
    # 关闭线程池
    pool.shutdown()
    
    

    输出结果

    ThreadPoolExecutor-0_0 0
    ThreadPoolExecutor-0_0 1
    ThreadPoolExecutor-0_0 2
    ThreadPoolExecutor-0_0 3
    ThreadPoolExecutor-0_0 4
    ThreadPoolExecutor-0_1 0
    ThreadPoolExecutor-0_1 1
    ThreadPoolExecutor-0_1 2
    ThreadPoolExecutor-0_1 3
    ThreadPoolExecutor-0_1 4
    ThreadPoolExecutor-0_1 5
    ThreadPoolExecutor-0_1 6
    ThreadPoolExecutor-0_1 7
    -----------
    ThreadPoolExecutor-0_0 5
    ThreadPoolExecutor-0_0 6
    ThreadPoolExecutor-0_0 7
    ThreadPoolExecutor-0_1 8
    ThreadPoolExecutor-0_1 9
    ThreadPoolExecutor-0_1 10
    ThreadPoolExecutor-0_1 11
    ThreadPoolExecutor-0_1 12
    ThreadPoolExecutor-0_1 13
    ThreadPoolExecutor-0_1 14
    ThreadPoolExecutor-0_1 15
    ThreadPoolExecutor-0_1 16
    ThreadPoolExecutor-0_1 17
    ThreadPoolExecutor-0_1 18
    ThreadPoolExecutor-0_1 19
    ThreadPoolExecutor-0_0 8
    ThreadPoolExecutor-0_0 9
    20
    10
    

    8. 线程相关类

    1. 线程局部变量

    threading模块下的local()函数

    import threading
    
    # 定义线程局部变量
    my_data = threading.local()
    
    
    def action(max):
        for i in range(max):
            try:
                my_data.x += i
            except:
                my_data.x = i
            print('{}中mydata.x的值为{}'.format(threading.current_thread().name, my_data.x))
    
    
    t1 = threading.Thread(target=action, args=(10,))
    t2 = threading.Thread(target=action, args=(10,))
    t1.start()
    t2.start()
    
    

    输出结果

    Thread-1中mydata.x的值为0
    Thread-1中mydata.x的值为1
    Thread-1中mydata.x的值为3
    Thread-1中mydata.x的值为6
    Thread-1中mydata.x的值为10
    Thread-1中mydata.x的值为15
    Thread-1中mydata.x的值为21
    Thread-1中mydata.x的值为28
    Thread-1中mydata.x的值为36
    Thread-1中mydata.x的值为45
    Thread-2中mydata.x的值为0
    Thread-2中mydata.x的值为1
    Thread-2中mydata.x的值为3
    Thread-2中mydata.x的值为6
    Thread-2中mydata.x的值为10
    Thread-2中mydata.x的值为15
    Thread-2中mydata.x的值为21
    Thread-2中mydata.x的值为28
    Thread-2中mydata.x的值为36
    Thread-2中mydata.x的值为45
    

    2. 定时器

    Thread类中有一个Timer子类,该子类可用于控制指定函数在特定时间内执行一次。
    只能控制函数在指定时间内执行一次,如果要多次执行,需要再执行下一次调度。
    取消调度,可调用Timer对象的cancel()函数。

    from threading import Timer
    
    
    def hello():
        print('hello')
    
    
    # 10秒后执行hello函数
    t = Timer(10, hello)
    t.start()
    

    输出结果

    hello
    

    3. 任务调度

    Python提供的sched模块


    sched

    9. 多线程

    1. 使用fork创建新进场

    os模块提供了一个fork()方法

    import os
    
    print('父进程(%s)开始执行' % os.getpid())
    # 开始fork一个子进程
    # 从这行代码开始,下面代码都会被两个进程执行
    pid = os.fork()
    print('进程进入:%s' % os.getpid())
    # 如果pid为0,表明子进程
    if pid == 0:
        print('子进程,其ID为 (%s), 父进程ID为 (%s)' % (os.getpid(), os.getppid()))
    else:
        print('我 (%s) 创建的子进程ID为 (%s).' % (os.getpid(), pid))
    print('进程结束:%s' % os.getpid())
    

    输出结果

    父进程(15329)开始执行
    进程进入:15329
    我 (15329) 创建的子进程ID为 (15330).
    进程结束:15329
    进程进入:15330
    子进程,其ID为 (15330), 父进程ID为 (15329)
    

    window上运行会报错
    报错信息为:AttributeError: module 'os' has no attribute 'fork'


    注意

    2. 使用multiprocessing.Process创建新进程

    import multiprocessing
    import os
    
    
    # 定义一个普通的action函数,该函数准备作为进程执行体
    def action(max):
        for i in range(max):
            print("(%s)子进程(父进程:(%s)):%d" %
                  (os.getpid(), os.getppid(), i))
    
    
    if __name__ == '__main__':
        # 下面是主程序(也就是主进程)
        for i in range(40):
            print("(%s)主进程: %d" % (os.getpid(), i))
            if i == 20:
                # 创建并启动第一个进程
                mp1 = multiprocessing.Process(target=action, args=(10,))
                mp1.start()
                # 创建并启动第一个进程
                mp2 = multiprocessing.Process(target=action, args=(10,))
                mp2.start()
                mp2.join()
        print('主进程执行完成!')
    

    输出结果

    (14292)主进程: 0
    (14292)主进程: 1
    (14292)主进程: 2
    (14292)主进程: 3
    (14292)主进程: 4
    (14292)主进程: 5
    (14292)主进程: 6
    (14292)主进程: 7
    (14292)主进程: 8
    (14292)主进程: 9
    (14292)主进程: 10
    (14292)主进程: 11
    (14292)主进程: 12
    (14292)主进程: 13
    (14292)主进程: 14
    (14292)主进程: 15
    (14292)主进程: 16
    (14292)主进程: 17
    (14292)主进程: 18
    (14292)主进程: 19
    (14292)主进程: 20
    (12444)子进程(父进程:(14292)):0
    (18196)子进程(父进程:(14292)):0
    (12444)子进程(父进程:(14292)):1
    (18196)子进程(父进程:(14292)):1
    (12444)子进程(父进程:(14292)):2
    (18196)子进程(父进程:(14292)):2
    (18196)子进程(父进程:(14292)):3
    (12444)子进程(父进程:(14292)):3
    (18196)子进程(父进程:(14292)):4
    (12444)子进程(父进程:(14292)):4
    (18196)子进程(父进程:(14292)):5
    (12444)子进程(父进程:(14292)):5
    (18196)子进程(父进程:(14292)):6
    (12444)子进程(父进程:(14292)):6
    (18196)子进程(父进程:(14292)):7
    (12444)子进程(父进程:(14292)):7
    (18196)子进程(父进程:(14292)):8
    (12444)子进程(父进程:(14292)):8
    (18196)子进程(父进程:(14292)):9
    (12444)子进程(父进程:(14292)):9
    (14292)主进程: 21
    (14292)主进程: 22
    (14292)主进程: 23
    (14292)主进程: 24
    (14292)主进程: 25
    (14292)主进程: 26
    (14292)主进程: 27
    (14292)主进程: 28
    (14292)主进程: 29
    (14292)主进程: 30
    (14292)主进程: 31
    (14292)主进程: 32
    (14292)主进程: 33
    (14292)主进程: 34
    (14292)主进程: 35
    (14292)主进程: 36
    (14292)主进程: 37
    (14292)主进程: 38
    (14292)主进程: 39
    主进程执行完成!
    

    3. 进程通信

    两种机制
    1. Queue实现进程通信
    import multiprocessing
    
    
    def f(q):
        print('(%s) 进程开始放入数据...' % multiprocessing.current_process().pid)
        q.put('Python')
    
    
    if __name__ == '__main__':
        # 创建进程通信的Queue
        q = multiprocessing.Queue()
        # 创建子进程
        p = multiprocessing.Process(target=f, args=(q,))
        # 启动子进程
        p.start()
        print('(%s) 进程开始取出数据...' % multiprocessing.current_process().pid)
        # 取出数据
        print(q.get())  # Python
        p.join()
    

    输出结果

    (13372) 进程开始取出数据...
    (22196) 进程开始放入数据...
    Python
    
    2. 使用Pipe实现进程通信
    PipeConnection
    import multiprocessing
    
    
    def f(conn):
        print('(%s) 进程开始发送数据...' % multiprocessing.current_process().pid)
        # 使用conn发送数据
        conn.send('Python')
    
    
    if __name__ == '__main__':
        # 创建Pipe,该函数返回两个PipeConnection对象
        parent_conn, child_conn = multiprocessing.Pipe()
        # 创建子进程
        p = multiprocessing.Process(target=f, args=(child_conn,))
        # 启动子进程
        p.start()
        print('(%s) 进程开始接收数据...' % multiprocessing.current_process().pid)
        # 通过conn读取数据
        print(parent_conn.recv())  # Python
        p.join()
    

    输出结果

    (14108) 进程开始接收数据...
    (13360) 进程开始发送数据...
    Python
    

    相关文章

      网友评论

        本文标题:Python基础(十二)并发编程

        本文链接:https://www.haomeiwen.com/subject/bcfwmctx.html