多进程之间通信的限制
- 进程之间是独立的,互不干扰的内存空间。
我们先看个例子
a = 1 #定义全局变量
def func():
global a
a=2 #修改全局变量值
print(a)
func()
print(a)
运行结果:
image.png
再看利用进程运行的例子:
import multiprocessing
a = 1 #定义全局变量
def func():
global a
a=2 #修改全局变量值
print(a)
process = multiprocessing.Process(target=func)
process.start()
process.join() #等待子进程执行完再继续执行
print(a)
image.png
通过上面2个例子运行结果分析:
按通常应该都是2,应该修改了全局变量值,但是这里只有子进程是2,主进程是1。
这是因为进程之间是独立的,互不干扰的内存空间,故子进程修改的,不影响主进程的。
进程间通信的解决方案
image.pngprint('--------------进程间通信的解决方案--------------')
manager = multiprocessing.Manager() #创建一个服务器进程,并返回与其通信的管理器
list_proxy = manager.list() #通过管理器在服务器进程中开辟一个列表空间,并返回一个代理
print(list_proxy) #用法和list一样
def func2(list):
list.append('a')
print(list)
#把代理传给子进程,子进程里就可以通过这个代理,来操作共享空间来进行通信
process2 = multiprocessing.Process(target=func2, args=(list_proxy,))
process2.start()
process2.join() #等待子进程执行完再继续执行
print(list_proxy)
运行结果:
image.png
- 一般常用的空间类型是:
mgr.list()、mgr.dict()、mgr.Queue()
多线程之间通信的限制
注意:因为线程属于同一个进程,因此它们之间共享内存区域,因此全局变量是公共的。
import threading
a = 1
def func3():
global a
a = 2
print(a)
thread = threading.Thread(target=func3)
thread.start()
thread.join()
print(a)
运行结果:
image.png
但是多线程间共享内存间存在竞争问题。
print('--------------多线程共享内存间存在竞争问题--------------')
import threading
data = 0
n = 100000
def add(n):
global data
for i in range(n):
data +=i
def sub(n):
global data
for i in range(n):
data -=i
t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join() #这2个地方加join阻塞目的是为了让子进程执行完,最后能在主进程看到data,所以用join来阻塞
print(data)
image.png
加了n次减了n次,结果却为负数,按正常应该为0。
使用锁来控制共享资源的访问。
print('--------------使用锁来控制共享资源的访问--------------')
import threading
data = 0
n = 1000000
lock = threading.Lock() #生成一把锁
def add(n):
global data
for i in range(n):
# lock.acquire() #加锁
# data +=i
# lock.release() #释放锁
#可以写生上下文格式
with lock:
data +=i
def sub(n):
global data
for i in range(n):
# lock.acquire() #加锁
# data -=i
# lock.release() #释放锁
with lock:
data -=i
t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join() #这2个地方加join阻塞目的是为了让子进程执行完,最后能在主进程看到data,所以用join来阻塞
print(data)
运行结果:
image.png
这样才达到目的,就像去银行存钱取钱,存取不多不少!
线程与进程的安全队列
队列:先进先出,一个入口,一个出口。 image.png- 线程安全队列操作
queue.Queue:
入队: put(item)
出队: get()
测试空: empty() # 近似
测试满: full() # 近似
队列长度: qsize() # 近似
任务结束: task_done()
等待完成: join() - 进程安全队列操作
mgr.Queue:
入队: put(item)
出队: get()
测试空: empty() # 近似
测试满: full() # 近似
队列长度: qsize() # 近似
进程比线程少了task_done()和 join()方法。
生产者和消费者模型
所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑生产只需要往队列里面丢东西(生产者不需要关心消费者)消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)。
image.png
image.png
线程实现生产者-消费者模型
print('--------------生产者与消费者模型--------------')
'''
所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑
生产者,只需要往队列里面丢东西(生产者不需要关心消费者)
消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)
'''
print('--------------多线程的消费者与生产者模式--------------')
'''
生产者:没满,则生产,只关心队列是否已满。满了就阻塞。
消费者:只关心队列是否为空。不为空,则消费,为空则阻塞。
'''
import threading
import queue
import random
import time
class Producer(threading.Thread): #生产者
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
item = random.randint(0, 10) #创建0~99
#只要队列没满,就向队列中添加数据
self.queue.put(item)
print('生产者-->生产:%s'%item)
time.sleep(1)
class Customer(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
#只要队列不为空,就从队列中取数据
itme = self.queue.get()
print('消费者-->消费:%s'%itme)
time.sleep(1)
q =queue.Queue(5) #长度为5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()
运行结果:
image.png
进程实现生产者-消费者模型
import multiprocessing
import queue
import random
import time
class Producer(multiprocessing.Process): #生产者
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
item = random.randint(0, 10) #创建0~99
#只要队列没满,就向队列中添加数据
self.queue.put(item)
print('生产者-->生产:%s'%item)
time.sleep(1)
class Customer(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
#只要队列不为空,就从队列中取数据
itme = self.queue.get()
print('消费者-->消费:%s'%itme)
time.sleep(1)
manager = multiprocessing.Manager() #创建一个服务器进程,并返回与其通信的管理器
q =manager.Queue(5) #长度为5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()
运行结果:
image.png
网友评论