美文网首页python
Python基础(十二)并发编程

Python基础(十二)并发编程

作者: 鹊南飞_ | 来源:发表于2019-10-24 18:41 被阅读0次

1. 线程概述

几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每一个运行中的程序就是一个进程。当一个程序进入内存运行时,即变成一个进程。进程是处于运行过程中的程序,并且具有一定的独立功能。进程是系统进行资源分配和调度的一个独立单位。
一般而言,进程包含以下三个特征。

  • 独立性: 进程是系统中独立存在的实体,它可以拥有自己的独立的资源,每一个进程都拥有自己的私有的地址空间。在没有经过进程本身允许的情况下,一个用户进程不可以直接访问其他进程的地址空间。
  • 动态性:进程与程序的区别在于,程序只是一个静态的指令集合,而进程是一个正在系统中活动的指令集合。在进程中加入了时间的概念。进程具有自己的生命周期和各种不同的状态,在程序中是没有这些概念的。
  • 并发性:多个进程可以在单个处理器上并发执行,多个进程之间不会互相影响。

并发和并行是两个概念,并行指在同一个时刻有多条指令在多个处理器上同时执行;并发指在同一时刻只能有一条指令执行,但多个进程指令被快速轮换执行,使得宏观上具有多个进程同时执行的效果。

操作系统可以同时执行多个任务,每一个任务就是一个进程;进程可以同时执行多个任务,每一个任务就是一个线程。
使用多线程编程具有以下几个优点:

  • 进程之间不能共享内存,线程之间共享内存非常容易。
  • 使用多线程来实现多任务并发执行比使用多进程的效率高
  • Python语言内置了多线程功能支持

2. 线程的创建和启动

threading模块

  • 使用threading模块的Thread类的构造器创建线程
  • 继承threading模块的Thread类创建线程类

1. 使用threading模块的Thread类的构造器创建线程

构造器如下

__init__(self, group=None,target=None,name=None.args=().kwargs=None, * , daemon=None)
构造器参数

步骤如下

  • 使用构造器创建线程对象
  • 调用start()方法启动该线程

示例代码如下:

import threading


def dog(num):
    print(num)


if __name__ == '__main__':
    t1 = threading.Thread(target=dog, args=(10,))
    t1.start()
    t2 = threading.Thread(target=dog, args=(20,))
    t2.start()


  • 在进行多线程编程时,不要忘记Python程序运行时默认的主线程。
  • 多线程就是让多个函数并发执行,让普通用户感觉到多个函数似乎同时在执行。

2. 继承threading模块的Thread类创建线程类

步骤如下:

  • 定义Thread类的子类,重写该类的run()方法。
  • 创建Thread子类的示例,即创建线程对象。
  • 调用线程对象的start()方法来启动线程。

示例代码如下:

import threading


class Dog(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):
        print(self.num)


if __name__ == '__main__':
    t1 = Dog(10)
    t1.start()
    t2 = Dog(20)
    t2.start()


推荐使用第一种方式来创建进程,因为编程简单,同时具有更清晰的逻辑结构

3. 线程的生命周期

  • 新建(New)
  • 就绪(Ready)
  • 运行(Running)
  • 堵塞(Blocked)
  • 死亡(Dead)

1. 新建和就绪状态

创建一个Thread对象或Thread子类的对象之后,该线程就处于新建状态。
当线程对象调用了start()方法后,该线程就处于就绪状态。


注意
注意

2. 运行和堵塞状态

当一个线程开始运行后,它不可能一直处于运行状态(除非它的线程执行体足够短,瞬间就结束了),线程在运行过程中需要被中断,目的是使其他线程获得执行的机会,线程调度的细节取决于底层平台所采用的策略。
进行阻塞状态的情况:

  • 线程调用了sleep()方法主动放弃所占用的处理器资源
  • 线程调用了一个阻塞式I/O方法,在该方法返回之前,该线程被阻塞
  • 线程试图获得一个锁对象,但该锁对象正被其他线程所持有
  • 线程在等待某个通知

解除阻塞,重新进入就绪状态额情况

  • 调用sleep()方法的线程经过了阻塞的时间
  • 线程调用的阻塞式I/O方法已经返回
  • 线程成功地获得了试图获取的锁对象
  • 线程正在等待某个通知时,其他线程发出了一个通知
线程状态转换图

3. 线程死亡

线程处于死亡状态:

  • run()方法或代表线程执行体的target函数执行完成,线程正常结束
  • 线程抛出一个未捕获的Exception或Error


    注意
    注意
    注意

4. 控制线程

1. join线程

Thread提供了让一个线程等待另一个线程完成的方法join()方法。
当某个程序执行六中调用其他线程的join()方法时,调用线程将被阻塞,直到join方法加入的join线程执行完成。
join(timeout=None)方法可以指定一个timeout参数,该参数指定等待被join线程的时间最长为timeout秒。如果在timeout秒内被join的线程还没有执行结束,则不再等待。

import threading


def dog(num):
    for i in range(num):
        print('{} {}'.format(threading.current_thread().name, i))


if __name__ == '__main__':
    t1 = threading.Thread(target=dog, args=(5,))
    t1.start()
    t1.join()
    t2 = threading.Thread(target=dog, args=(10,))
    t2.start()
    t2.join()
    print('执行结束')

加入join后,t2只会在t1执行后才开始运行,运行结果为

Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3
Thread-1 4
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-2 5
Thread-2 6
Thread-2 7
Thread-2 8
Thread-2 9
执行结束

去掉join之后

import threading


def dog(num):
    for i in range(num):
        print('{} {}'.format(threading.current_thread().name, i))


if __name__ == '__main__':
    t1 = threading.Thread(target=dog, args=(5,))
    t1.start()
    # t1.join()
    t2 = threading.Thread(target=dog, args=(10,))
    t2.start()
    # t2.join()
    print('执行结束')

去掉join之后,运行结果

Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3
Thread-1 4
Thread-2 0
Thread-2 1
执行结束
Thread-2 2
Thread-2 3
Thread-2 4
Thread-2 5
Thread-2 6
Thread-2 7
Thread-2 8
Thread-2 9

2. 后台线程

有一种线程,它是后台运行的,它的任务是为其他线程提供服务,这种线程被称为“后台线程”(Daemon Thread),又称为“守护线程”或“精灵线程”。Python解释器的垃圾回收线程就是典型的后台线程。
后台线程有一个特征:如果所有的前台线程都死亡了,那么后台线程就会自动死亡。
调用Thread对象的daemon属性可以将指定线程设置成后台线程。

import threading


def dog(num):
    for i in range(num):
        print('{} {}'.format(threading.current_thread().name, i))


if __name__ == '__main__':
    t1 = threading.Thread(target=dog, args=(1000,))
    # 将t1设置为后台线程,然后启动该线程
    # 该线程本应该运行到第1000次循环,但无法运行到1000
    # 因为程序中的主线程也就是唯一的前台线程结束后,程序会主动退出,后台线程也会被结束了
    t1.daemon = True
    t1.start()
    t2 = threading.Thread(target=dog, args=(10,))
    t2.start()
    print('执行结束')

运行结果

# 省略之前的结果
Thread-2 4
Thread-2 5
Thread-2 6
Thread-2 7
Thread-1 29
Thread-1 30
执行结束
Thread-2 8
Thread-1 31
Thread-2 9

创建后台线程有两种方式:

  • 主动将线程的daemon属性设置为True
  • 后台线程启动的线程默认是后台线程
注意

3. 线程睡眠

如果需要让当前正在执行的线程暂停一段时间,并进入阻塞状态,可以调用time模块的sleep(secs)函数来实现,该函数可以指定一个secs参数,用于指定线程阻塞多少秒

import time

for i in range(10):
    local_time = time.localtime(time.time())
    print(time.strftime("%Y-%m-%d %H:%M:%S", local_time))
    time.sleep(2)

可以看出每隔两秒,就输出一条记录

5. 线程同步

1. 线程安全问题

经典的问题:银行取钱问题


银行取钱问题步骤
import threading
import time


# 账户类
class Account:
    def __init__(self, account_no, balance):
        # 账户编号和账户余额
        self.account_no = account_no
        self.balance = balance


# 取钱操作
def draw(account, draw_amount):
    # 账户余额大于取钱数
    if account.balance >= draw_amount:
        time.sleep(0.001)
        account.balance -= draw_amount
        print('{}取钱成功,余额为{}'.format(threading.current_thread().name, account.balance))
    else:
        print('取钱失败,余额不足')


acct = Account('abc', 1000)
t1 = threading.Thread(name='小红', target=draw, args=(acct, 800))
t2 = threading.Thread(name='小明', target=draw, args=(acct, 800))
t1.start()
t2.start()

输出结果

小明取钱成功,余额为200
小红取钱成功,余额为-600

虽然是程序中人为使用time.sleep(0.001)来强制线程调度切换,但这种切换也是完全可能发生的。

2. 同步锁

Python的threading模块引入了锁(lock),提供了Lock和RLock两个类


同步锁
通用Rlock的代码格式

修改以上代码

import threading
import time


# 账户类
class Account:
    def __init__(self, account_no, balance):
        # 账户编号和账户余额
        self.account_no = account_no
        self._balance = balance
        # 增加锁
        self.lock = threading.RLock()

    def get_balance(self):
        return self._balance

    # 取钱操作
    def draw(self, draw_amount):
        # 加锁
        self.lock.acquire()
        try:
            # 账户余额大于取钱数
            if self._balance >= draw_amount:
                time.sleep(0.001)
                self._balance -= draw_amount
                print('{}取钱成功,余额为{}'.format(threading.current_thread().name, self._balance))
            else:
                print('{}取钱失败,余额不足'.format(threading.current_thread().name))
        finally:
            self.lock.release()


# 取钱操作
def draw(account, draw_amount):
    account.draw(draw_amount)


acct = Account('abc', 1000)
t1 = threading.Thread(name='小红', target=draw, args=(acct, 800))
t2 = threading.Thread(name='小明', target=draw, args=(acct, 800))
t1.start()
t2.start()

输出结果

小红取钱成功,余额为200
小明取钱失败,余额不足

安全访问逻辑: 加锁,修改, 释放锁


提示

可变类的线程是以降低程序的运行效率作为代价的,为了减少线程安全所带来的的负面影响,程序可以采用如下策略:

  • 不要对线程安全类的所有方法都进行永不,只对那些会改变竞争资源(共享资源)的方法进行同步。(如上述例子中的account_no实例变量就无需同步)
  • 如果可变类有两种运行环境:单线程环境和多线程环境,则应该为该可变类提供两种版本,即线程不安全版本和线程安全版本,在单线程环境中使用线程不安全版本以保证性能,在多线程环境中使用线程安全版本

3. 死锁

当两个线程相互等待对方释放同步监视器时就会出现死锁。
Python解释器没有检测,也没有采取措施来处理死锁情况,所以在进行多线程编程时应该采取措施来避免死锁。一旦出现死锁,整个程序既不会发生任何异常,也不会给出任何提示,只是所有线程都处于堵塞状态,无法继续。

避免死锁的常见方式:

  • 避免多次锁定: 尽量避免用一个线程对多个Lock进行锁定。
  • 具用相同加锁顺序:如果多个线程需要对多个Lock进行锁定,则应该保证它们以相同的顺序请求加锁。
  • 使用定时锁: acquire()方法加锁时,可以指定timeout参数
  • 死锁检测:死锁检测是一种依靠算法机制来实现的死锁预防机制,它主要是针对那些不可使用按序加锁和定时锁的场景的。

6. 线程通信

1. 使用Condition实现线程通信

Condition
import threading
import time


# 账户类
class Account:
    def __init__(self, account_no, balance):
        # 账户编号和账户余额
        self.account_no = account_no
        self._balance = balance
        self.cond = threading.Condition()
        # 是否存钱的旗标
        self._flag = False

    def get_balance(self):
        return self._balance

    # 取钱操作
    def draw(self, draw_amount):
        # 加锁
        self.cond.acquire()
        try:
            # sele._flag为True时,等待存钱,取钱方法被堵塞
            if not self._flag:
                self.cond.wait()
            else:
                self._balance -= draw_amount
                print('{}取钱成功,余额为{}'.format(threading.current_thread().name, self._balance))
                self._flag = False
                # 唤醒其他线程
                self.cond.notify_all()
        finally:
            self.cond.release()

    # 存钱操作
    def deposit(self, deposit_amount):
        # 加锁
        self.cond.acquire()
        try:
            # sele._flag为False时,等待取钱,存钱方法被堵塞
            if self._flag:
                self.cond.wait()
            else:
                self._balance += deposit_amount
                print('{}存钱成功,余额为{}'.format(threading.current_thread().name, self._balance))
                self._flag = True
                # 唤醒其他线程
                self.cond.notify_all()
        finally:
            self.cond.release()


# 取钱操作
def draw_many(account, draw_amount, max):
    for i in range(max):
        account.draw(draw_amount)


# 存钱操作
def deposit_many(account, deposit_amount, max):
    for i in range(max):
        account.deposit(deposit_amount)


acct = Account('abc', 0)
t1 = threading.Thread(name='取钱A', target=draw_many, args=(acct, 800, 100))
t2 = threading.Thread(name='存钱B', target=deposit_many, args=(acct, 800, 100))
t3 = threading.Thread(name='存钱C', target=deposit_many, args=(acct, 800, 100))
t4 = threading.Thread(name='存钱D', target=deposit_many, args=(acct, 800, 100))
t1.start()
t2.start()
t3.start()
t4.start()

运行结果

存钱B存钱成功,余额为800
取钱A取钱成功,余额为0
存钱C存钱成功,余额为800
取钱A取钱成功,余额为0
存钱C存钱成功,余额为800
取钱A取钱成功,余额为0
存钱D存钱成功,余额为800
取钱A取钱成功,余额为0
存钱D存钱成功,余额为800
取钱A取钱成功,余额为0
存钱C存钱成功,余额为800
取钱A取钱成功,余额为0
存钱B存钱成功,余额为800
取钱A取钱成功,余额为0
存钱B存钱成功,余额为800
取钱A取钱成功,余额为0
存钱D存钱成功,余额为800
取钱A取钱成功,余额为0
存钱B存钱成功,余额为800
取钱A取钱成功,余额为0
存钱C存钱成功,余额为800
# 省略之后的结果

2. 使用队列Queue控制线程通信

三个队列类
三个队列类的属性和方法
import queue

# 定义一个长度为2的队列
q = queue.Queue(2)
q.put('1')
print('111')
q.put('2')
print('222')
q.put(3)
print('333')

结果

111
222

队列的长度为2, 使用put()放入两个元素时,顺利执行。放入第三个元素时,会堵塞进程。
当然,当Queue已空的情况下,使用get()方法也会堵塞进程

import queue
import threading
import time


def product(q):
    fruit = ['apple', 'pear', 'banana']
    for i in range(10000):
        time.sleep(0.2)
        # 尝试放入元素,如果队列已满,则会被阻塞
        q.put(fruit[i % 3])
        print('{}生产元组元素'.format(threading.current_thread().name))


def consumer(q):
    while True:
        time.sleep(0.2)
        # 尝试取出元素,如果队列已空,则会被阻塞
        t = q.get()
        print('{}消费元素{}'.format(threading.current_thread().name, t))


# 创建容量为3的队列
q = queue.Queue(3)
# 启动三个生产者线程,一个消费者线程
t1 = threading.Thread(target=product, args=(q,))
t2 = threading.Thread(target=product, args=(q,))
t3 = threading.Thread(target=product, args=(q,))
t4 = threading.Thread(target=consumer, args=(q,))
t1.start()
t2.start()
t3.start()
t4.start()

输出结果

Thread-3生产元组元素
Thread-4消费元素apple
Thread-1生产元组元素
Thread-2生产元组元素
Thread-4消费元素apple
Thread-2生产元组元素
Thread-3生产元组元素
Thread-4消费元素apple
Thread-1生产元组元素
Thread-4消费元素pear
Thread-1生产元组元素
Thread-4消费元素pear
Thread-1生产元组元素
Thread-4消费元素pear
Thread-1生产元组元素
# 省略之后的结果

3. 使用Event控制线程通信

Event是一种非常简单的线程通信机制:一个线程发出一个Event,另一个线程可以通过该Event被触发
Event提供了以下方法

  • is_set():该方法返回Event的内部旗标是否未True。
  • set(): 该方法会把Event的内部旗标设置为True,并唤醒所有处于等待状态的线程。
  • clear():改方法会把Event的内部旗标设置为False,通常会调用wait()方法来阻塞当前线程。
  • wait(timeout=None):该方法会阻塞当前线程
import threading
import time

event = threading.Event()


def cal(name):
    print('{}进入阻塞状态'.format(name))
    # 进入阻塞状态
    event.wait()
    print('{}进入运行状态'.format(name))


t1 = threading.Thread(target=cal, args=('a',))
t2 = threading.Thread(target=cal, args=('b',))
t1.start()
t2.start()
time.sleep(2)
print('-----------')
event.set()

输出结果

a进入阻塞状态
b进入阻塞状态
-----------
a进入运行状态
b进入运行状态
注意

7. 线程池

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好的提供性能。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
使用线程池还可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降。

1. 使用线程池

线程池的基类是concurrent.futures模块中的Executor,Executor有两个子类,其中ProcessPoolExecutor用于创建进程池,ThreadPoolExecutor用于创建线程池。


Executor
Future
线程池执行步骤
from concurrent.futures import ThreadPoolExecutor
import threading
import time


def action(max):
    for i in range(max):
        print(threading.current_thread().name + " " + str(i))


# 创建一个包含两个线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交两个任务
future1 = pool.submit(action, 10)
future2 = pool.submit(action, 20)
# 判断任务1是否结束
print(future1.done())
time.sleep(3)
# 判断任务2是否结束
print(future2.done())
# 查看返回结果
print(future1.result())
print(future2.result())
# 关闭线程池
pool.shutdown()

输出结果

ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_0 1
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 3
ThreadPoolExecutor-0_0 4
ThreadPoolExecutor-0_0 5
ThreadPoolExecutor-0_0 6
ThreadPoolExecutor-0_0 7
ThreadPoolExecutor-0_0 8
ThreadPoolExecutor-0_0 9
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_0 1
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 3
ThreadPoolExecutor-0_0 4
ThreadPoolExecutor-0_0 5
ThreadPoolExecutor-0_0 6
ThreadPoolExecutor-0_0 7
ThreadPoolExecutor-0_0 8
True
ThreadPoolExecutor-0_0 9
ThreadPoolExecutor-0_0 10
ThreadPoolExecutor-0_0 11
ThreadPoolExecutor-0_0 12
ThreadPoolExecutor-0_0 13
ThreadPoolExecutor-0_0 14
ThreadPoolExecutor-0_0 15
ThreadPoolExecutor-0_0 16
ThreadPoolExecutor-0_0 17
ThreadPoolExecutor-0_0 18
ThreadPoolExecutor-0_0 19
True
None
None

2. 获取执行结果

可以用Future的result()来获取返回值,当该方法会阻塞当前主线程。
可以通过Future的add_done_callback()方法来添加回调函数。当线程完成后,程序会自动触发该回调函数,并将对应的Future对象作为参数传给该回调函数。

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def action(max):
    for i in range(max):
        print(threading.current_thread().name + " " + str(i))
    time.sleep(0.5)
    return max


def get_result(future):
    print(future.result())


# 创建一个包含两个线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交两个任务
future1 = pool.submit(action, 10)
future2 = pool.submit(action, 20)
# 查看返回结果
future1.add_done_callback(get_result)
future2.add_done_callback(get_result)
print('-----------')
# 关闭线程池
pool.shutdown()

输出结果

ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_0 1
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 3
ThreadPoolExecutor-0_0 4
ThreadPoolExecutor-0_1 0
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_1 2
ThreadPoolExecutor-0_1 3
ThreadPoolExecutor-0_1 4
ThreadPoolExecutor-0_1 5
ThreadPoolExecutor-0_1 6
ThreadPoolExecutor-0_1 7
-----------
ThreadPoolExecutor-0_0 5
ThreadPoolExecutor-0_0 6
ThreadPoolExecutor-0_0 7
ThreadPoolExecutor-0_1 8
ThreadPoolExecutor-0_1 9
ThreadPoolExecutor-0_1 10
ThreadPoolExecutor-0_1 11
ThreadPoolExecutor-0_1 12
ThreadPoolExecutor-0_1 13
ThreadPoolExecutor-0_1 14
ThreadPoolExecutor-0_1 15
ThreadPoolExecutor-0_1 16
ThreadPoolExecutor-0_1 17
ThreadPoolExecutor-0_1 18
ThreadPoolExecutor-0_1 19
ThreadPoolExecutor-0_0 8
ThreadPoolExecutor-0_0 9
20
10

8. 线程相关类

1. 线程局部变量

threading模块下的local()函数

import threading

# 定义线程局部变量
my_data = threading.local()


def action(max):
    for i in range(max):
        try:
            my_data.x += i
        except:
            my_data.x = i
        print('{}中mydata.x的值为{}'.format(threading.current_thread().name, my_data.x))


t1 = threading.Thread(target=action, args=(10,))
t2 = threading.Thread(target=action, args=(10,))
t1.start()
t2.start()

输出结果

Thread-1中mydata.x的值为0
Thread-1中mydata.x的值为1
Thread-1中mydata.x的值为3
Thread-1中mydata.x的值为6
Thread-1中mydata.x的值为10
Thread-1中mydata.x的值为15
Thread-1中mydata.x的值为21
Thread-1中mydata.x的值为28
Thread-1中mydata.x的值为36
Thread-1中mydata.x的值为45
Thread-2中mydata.x的值为0
Thread-2中mydata.x的值为1
Thread-2中mydata.x的值为3
Thread-2中mydata.x的值为6
Thread-2中mydata.x的值为10
Thread-2中mydata.x的值为15
Thread-2中mydata.x的值为21
Thread-2中mydata.x的值为28
Thread-2中mydata.x的值为36
Thread-2中mydata.x的值为45

2. 定时器

Thread类中有一个Timer子类,该子类可用于控制指定函数在特定时间内执行一次。
只能控制函数在指定时间内执行一次,如果要多次执行,需要再执行下一次调度。
取消调度,可调用Timer对象的cancel()函数。

from threading import Timer


def hello():
    print('hello')


# 10秒后执行hello函数
t = Timer(10, hello)
t.start()

输出结果

hello

3. 任务调度

Python提供的sched模块


sched

9. 多线程

1. 使用fork创建新进场

os模块提供了一个fork()方法

import os

print('父进程(%s)开始执行' % os.getpid())
# 开始fork一个子进程
# 从这行代码开始,下面代码都会被两个进程执行
pid = os.fork()
print('进程进入:%s' % os.getpid())
# 如果pid为0,表明子进程
if pid == 0:
    print('子进程,其ID为 (%s), 父进程ID为 (%s)' % (os.getpid(), os.getppid()))
else:
    print('我 (%s) 创建的子进程ID为 (%s).' % (os.getpid(), pid))
print('进程结束:%s' % os.getpid())

输出结果

父进程(15329)开始执行
进程进入:15329
我 (15329) 创建的子进程ID为 (15330).
进程结束:15329
进程进入:15330
子进程,其ID为 (15330), 父进程ID为 (15329)

window上运行会报错
报错信息为:AttributeError: module 'os' has no attribute 'fork'


注意

2. 使用multiprocessing.Process创建新进程

import multiprocessing
import os


# 定义一个普通的action函数,该函数准备作为进程执行体
def action(max):
    for i in range(max):
        print("(%s)子进程(父进程:(%s)):%d" %
              (os.getpid(), os.getppid(), i))


if __name__ == '__main__':
    # 下面是主程序(也就是主进程)
    for i in range(40):
        print("(%s)主进程: %d" % (os.getpid(), i))
        if i == 20:
            # 创建并启动第一个进程
            mp1 = multiprocessing.Process(target=action, args=(10,))
            mp1.start()
            # 创建并启动第一个进程
            mp2 = multiprocessing.Process(target=action, args=(10,))
            mp2.start()
            mp2.join()
    print('主进程执行完成!')

输出结果

(14292)主进程: 0
(14292)主进程: 1
(14292)主进程: 2
(14292)主进程: 3
(14292)主进程: 4
(14292)主进程: 5
(14292)主进程: 6
(14292)主进程: 7
(14292)主进程: 8
(14292)主进程: 9
(14292)主进程: 10
(14292)主进程: 11
(14292)主进程: 12
(14292)主进程: 13
(14292)主进程: 14
(14292)主进程: 15
(14292)主进程: 16
(14292)主进程: 17
(14292)主进程: 18
(14292)主进程: 19
(14292)主进程: 20
(12444)子进程(父进程:(14292)):0
(18196)子进程(父进程:(14292)):0
(12444)子进程(父进程:(14292)):1
(18196)子进程(父进程:(14292)):1
(12444)子进程(父进程:(14292)):2
(18196)子进程(父进程:(14292)):2
(18196)子进程(父进程:(14292)):3
(12444)子进程(父进程:(14292)):3
(18196)子进程(父进程:(14292)):4
(12444)子进程(父进程:(14292)):4
(18196)子进程(父进程:(14292)):5
(12444)子进程(父进程:(14292)):5
(18196)子进程(父进程:(14292)):6
(12444)子进程(父进程:(14292)):6
(18196)子进程(父进程:(14292)):7
(12444)子进程(父进程:(14292)):7
(18196)子进程(父进程:(14292)):8
(12444)子进程(父进程:(14292)):8
(18196)子进程(父进程:(14292)):9
(12444)子进程(父进程:(14292)):9
(14292)主进程: 21
(14292)主进程: 22
(14292)主进程: 23
(14292)主进程: 24
(14292)主进程: 25
(14292)主进程: 26
(14292)主进程: 27
(14292)主进程: 28
(14292)主进程: 29
(14292)主进程: 30
(14292)主进程: 31
(14292)主进程: 32
(14292)主进程: 33
(14292)主进程: 34
(14292)主进程: 35
(14292)主进程: 36
(14292)主进程: 37
(14292)主进程: 38
(14292)主进程: 39
主进程执行完成!

3. 进程通信

两种机制
1. Queue实现进程通信
import multiprocessing


def f(q):
    print('(%s) 进程开始放入数据...' % multiprocessing.current_process().pid)
    q.put('Python')


if __name__ == '__main__':
    # 创建进程通信的Queue
    q = multiprocessing.Queue()
    # 创建子进程
    p = multiprocessing.Process(target=f, args=(q,))
    # 启动子进程
    p.start()
    print('(%s) 进程开始取出数据...' % multiprocessing.current_process().pid)
    # 取出数据
    print(q.get())  # Python
    p.join()

输出结果

(13372) 进程开始取出数据...
(22196) 进程开始放入数据...
Python
2. 使用Pipe实现进程通信
PipeConnection
import multiprocessing


def f(conn):
    print('(%s) 进程开始发送数据...' % multiprocessing.current_process().pid)
    # 使用conn发送数据
    conn.send('Python')


if __name__ == '__main__':
    # 创建Pipe,该函数返回两个PipeConnection对象
    parent_conn, child_conn = multiprocessing.Pipe()
    # 创建子进程
    p = multiprocessing.Process(target=f, args=(child_conn,))
    # 启动子进程
    p.start()
    print('(%s) 进程开始接收数据...' % multiprocessing.current_process().pid)
    # 通过conn读取数据
    print(parent_conn.recv())  # Python
    p.join()

输出结果

(14108) 进程开始接收数据...
(13360) 进程开始发送数据...
Python

相关文章

网友评论

    本文标题:Python基础(十二)并发编程

    本文链接:https://www.haomeiwen.com/subject/bcfwmctx.html