进程概述
一个程序至少有一个进程,一个进程至少有一个线程,多进程可以完成多任务.
任务数往往大于cpu的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态
进程状态就绪态:运行的条件都已经慢去,正在等在cpu执行
执行态:cpu正在执行其功能
等待态:等待某些条件满足,例如一个程序sleep了,此时就处于等待态
一个进程默认有一个线程,进程里面可以创建线程,线程是依附在进程里面的,没有进程就没有线程。
创建多进程:Process
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive() 判断当前的线程是否处于活动状态
join([timeout])
情况一:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源。
情况二:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要有一种机制能够让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是join方法的作用
join这是让主进程等待,子进程是都在运行的,同时启动4个进程,那么四个进程都会同时执行!
run()
只是类的一个普通方法,如果直接调用Run方法,程序中依然只有主线程这一个线程,其程序执行路径还是只有一条,还是要顺序执行,还是要等待run方法体执行完毕后才可继续执行下面的代码,这样就没有达到写线程的目的。
start()
启动线程,真正实现了多线程运行,这时无需等待run方法体代码执行完毕而直接继续执行下面的代码:调用Thread类的 start()方法来启动一个线程,这时此线程处于就绪(可运行)状态,并没有运行,一旦得到cpu时间片,就开始执行run()方法,这里方法 run()称为线程体,当线程结束后,不可以重新启动。
terminate()
强制杀死某一个进程
属性:
group:指定进程组,目前只能使用None
target:执行的目标任务名
name:进程名字
args:以元组方式给执行任务传参
kwargs:以字典方式给执行任务传参
demo:
import multiprocessing
import time
def worker_1(interval): #定义函数,进程加入
print "worker_1"
time.sleep(interval)
print "end worker_1"
def worker_2(interval):
print "worker_2"
time.sleep(interval)
print "end worker_2"
def worker_3(interval):
print "worker_3"
time.sleep(interval)
print "end worker_3"
if __name__ =="__main__":
#创建进程注:target增加的是进程的方法,不要加小括号,arges后面接的是元组,如果只传一个参数后面加逗 号
p1 = multiprocessing.Process(target = worker_1,args = (2,))
p2 = multiprocessing.Process(target = worker_2,args = (3,))
p3 = multiprocessing.Process(target = worker_3,args = (4,))
p1.start() #开启进程
#让子进程直接销毁,表示终止执行, 主进程退出之前,把所有的子进程直接销毁就可以了
p1. terminate()
p2.start()
p3.start()
进程之间不共享全局变量
创建子进程其实是对主进程进行拷贝,进程之间相互独立,访问的全局变量不是同一个,所以进程之间不共享全局变量
主进程会等待所有的子进程执行完成程序再退出
进程间通信-Queue
可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
Queue.qsize():返回当前队列包含的消息数量;
Queue.empty():如果队列为空,返回True,反之False , 注意这个操作是不可靠的。
Queue.full():如果队列满了,返回True,反之False;
Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;
2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;
Queue.get_nowait():相当Queue.get(False);
Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;
2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;
Queue.put_nowait(item):相当Queue.put(item, False);
demo:
import multiprocessing
import time
# 写入数据
def write_data(queue):
for i in range(10):
if queue.full():
print("队列满了")
break
queue.put(i)
time.sleep(0.2)
print(i)
# 读取数据
def read_data(queue):
while True:
# 加入数据从队列取完了,那么跳出循环
if queue.qsize() ==0:
print("队列空了")
break
value = queue.get()
print(value)
if __name__ =='__main__':
# 创建消息队列
queue = multiprocessing.Queue(5)
# 创建写入数据的进程
write_process = multiprocessing.Process(target=write_data,args=(queue,))
# 创建读取数据的进程
read_process = multiprocessing.Process(target=read_data,args=(queue,))
# 启动进程
write_process.start()
# 主进程等待写入进程执行完成以后代码再继续往下执行
write_process.join()
read_process.start()
从队列取值使用get方法,向队列放入值使用put方法
消息队列判断队列是否为空不可靠,可以使用延时和根据个数进行判断
进程池Pool
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务.
进程池同步执行任务表示进程池中的进程在执行任务的时候一个执行完成另外一个才能执行,如果没有执行完会等待上一个进程执行
进程池异步执行任务表示进程池中的进程同时执行任务,进程之间不会等待
multiprocessing.Pool常用函数解析:
apply(func[, args[, kwds]]): 阻塞方式调用函数,args表示以元组方式给函数传参,kwds表示以字典方式给函数传参
apply_async(func[, args[, kwds]]) :使用非阻塞方式调用函数,args表示以元组方式给函数传参,kwds表示以字典方式给函数传参
close():关闭Pool,使其不再接受新的任务;
terminate():不管任务是否完成,立即终止;
join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
demo:
# 进程池:池子里面放的进程,进程池会根据任务执行情况自动创建进程,而且尽量少创建进程,合理利用进程池中的进程完成多任务
import multiprocessing
import time
# 拷贝任务
def work():
print("复制中...", multiprocessing.current_process().pid)
# 获取当前进程的守护状态
# 提示:使用进程池创建的进程是守护主进程的状态,默认自己通过Process创建的进程是不是守住主进程的状态
# print(multiprocessing.current_process().daemon)
time.sleep(0.5)
if __name__ =='__main__':
# 创建进程池
# 3:进程池中进程的最大个数
pool = multiprocessing.Pool(3)
# 模拟大批量的任务,让进程池去执行
for i in range(5):
# 循环让进程池执行对应的work任务
# 同步执行任务,一个任务执行完成以后另外一个任务才能执行
# pool.apply(work)
# 异步执行,任务执行不会等待,多个任务一起执行
pool.apply_async(work)
# 关闭进程池,意思告诉主进程以后不会有新的任务添加进来
pool.close()
# 主进程等待进程池执行完成以后程序再退出
pool.join()
注:在使用进程池的时候一定要先close,然后在join。否则会出现错误,因为进程池没有关闭,还有可能有任务增加到进程池中。
如果注进程死亡,进程池会被销毁,那么进程池中的进程也不会存在。
网友评论