Day20复习
进程:
具有生命周期的,动态、可执行的文件
进程的创建
Process(target,参数)
start
join
练习1:多个进程同时处理质数求和
"""
编写程序 求 100000以内的质数之和
编写一个装饰器求 这个过程的时间
使用4个进程做同样的事情,求时间
使用10个进程做同样的事情,求时间
"""
import time
from multiprocessing import Process
# 装饰器求函数的运行时间
def times(func):
def wrapper(*args, **kwargs):
start_time = time.time()
res = func(*args, **kwargs)
end_time = time.time()
print('函数执行时间:', end_time - start_time)
return res
return wrapper
# 自定义的进程类
class Prime(Process):
def __init__(self, begin, end):
super().__init__()
self.begin = begin
self.end = end
# 判断一个数是否为质数
@staticmethod
def is_prime(n):
if n <= 1:
return False
for i in range(2, n // 2 + 1):
if n % i == 0:
return False
return True
# 求质数之和
def run(self):
prime = []
for i in range(self.begin, self.end):
if Prime.is_prime(i):
prime.append(i)
print(sum(prime))
@times
def process_10():
jobs = []
for i in range(1, 100001, 10000):
p = Prime(i, i + 10000)
jobs.append(p)
p.start()
[i.join() for i in jobs]
times
def test():
p = Prime(1, 100001)
p.start()
p.join()
# 函数执行时间1: 13.205774307250977
# 函数执行时间4: 8.017680168151855
# 函数执行时间10: 7.832202911376953
if __name__ == '__main__':
# process_10()
test()
练习2:多进程拷贝文件
"""
拷贝一个目录
假设这个目录有若干文件 需要编写程序,将目录拷贝一份,注意拷贝过程中
需要多文件同时拷贝(进程池)
"""
from multiprocessing import Pool, Queue
import os
q = Queue() # 生成消息队列
# 对文件的拷贝(进程池的事件)从源文件夹拷贝到新文件夹
def copy1(filename, old, new):
fr = open(old + '/' + filename, 'rb')
fw = open(new + '/' + filename, 'wb')
while True:
data = fr.read(1024 * 1024)
if not data:
break
n = fw.write(data)
q.put(n) # 将已经拷贝的大小给父进程
fr.close()
fw.close()
# 获取文件夹(目录)
def total_size(dir):
res = 0
for file in os.listdir(dir):
res += os.path.getsize(dir + '/' + file)
return res
def main(old_folder):
# 创建新的文件夹
new_folder = old_folder.split('/')[-1] + '-备份'
os.mkdir(new_folder)
# 获取文件里列表
file_list = os.listdir(old_folder)
size = total_size(old_folder)
# 创建进程池
pool = Pool(4)
# 循添加事件到环的进程池中
for file in file_list:
pool.apply_async(copy1, args=(file, old_folder, new_folder))
print('拷贝完毕')
pool.close()
# 拷贝百分比(消息队列有关系了)
copy_size = 0
while True:
copy_size += q.get() # 累加
print("已拷贝%.2f%%" % (copy_size / size * 100))
if copy_size >= size:
break
pool.join()
if __name__ == '__main__':
main('/home/tarena/ftp')
僵尸进程
孤儿进程
进程池:
由于我们频繁的创建和销毁进程,会带来大量的资源消耗,所以进程池应运而生
创建有限的进程统一销毁
实践项目:聊天室
提出问题
【1】 有人进入聊天室需要输入姓名,姓名不能重复
【2】 有人进入聊天室时,其他人会收到通知:xxx 进入了聊天室
【3】 一个人发消息,其他人会收到:xxx : xxxxxxxxxxx
【4】 有人退出聊天室,则其他人也会收到通知:xxx退出了聊天室
【5】 扩展功能:服务器可以向所有用户发送公告:管理员消息: xxxxxxxxx
分析问题
客户端
服务器端
UDP
TCP
L C Q
服务器功能
功能
登录
判断这个用户是否存在
是否可以进入聊天室
通知其他人
加入用户字典
聊天
把消息发出去
退出
退出
删除用户
创建套接字
进程
子进程
请求处理客户端的请求相对应的功能
处理管理员消息
父进程
客户端的功能
功能
登录
输入名字,正确 登录
聊天
发消息
退出
收消息
链接
进入聊天室
接收反馈
子进程负责发送消息
父进程负责接收消息
解决问题
给出代码
测试
编写技术文档
chat_server.py0如下
项目名称 版本号 作者 公司 Python版本3.6
from socket import *
from multiprocessing import Process
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST, PORT)
# 用户的存储信息{name:address}
user = {}
# 登录
def do_login(sock, name, address):
if name in user or '管理' in name:
sock.sendto(b'Fail', address)
else:
sock.sendto(b'OK', address)
# 先通知其他人
msg = "欢迎 %s 进入聊天室" % name
for key, value in user.items():
sock.sendto(msg.encode(), value)
# 加入用户
user[name] = address
# 聊天
def do_chat(sock, name, content):
"""
# 聊天
:param sock: 套接字
:param name: 谁发的消息
:param content: 发的什么
"""
msg = '%s : %s' % (name, content)
# 通知其他人
for key, value in user.items():
# 刨除本人
if key != name:
sock.sendto(msg.encode(), value)
# 退出
def do_exit(sock, name):
del user[name]
msg = '%s 退出聊天室' % name
for key, value in user.items():
sock.sendto(msg.encode(), value)
# 处理客户端的请求
def handle(sock):
# 循环接收用户的请求
while True:
data, addr = sock.recvfrom(1024)
# 根据内容分情况讨论
temp = data.decode().split(' ', 2)
# L 登录 L xixi
if temp[0] == 'L':
do_login(sock, temp[1], addr)
elif temp[0] == 'C': # C 聊天
do_chat(sock, temp[1], temp[2])
elif temp[0] == 'Q': # Q 退出
do_exit(sock, temp[1])
else:
print('客户端所发请求有误')
# 搭建总体的逻辑结构
def main():
# 创建UDP套接字
sock = socket(AF_INET, SOCK_DGRAM)
# 绑定
sock.bind(ADDR)
# 创建进程(子进程负责客户端的请求内容)
p = Process(target=handle, args=(sock,), daemon=True)
p.start()
# 父进程发送管理员消息
while True:
content = input("管理员消息:")
if content == 'exit':
break
msg = 'C 管理员消息: ' + content
# 父进程# 发送管理员消息给子进程
sock.sendto(msg.encode(), ADDR)
if __name__ == '__main__':
main()
chat_client.py如下:
"""
客户端
"""
from socket import *
from multiprocessing import Process
import sys
server_addr = ('127.0.0.1', 8888)
# 登录
def do_login(sock):
while True:
name = input("请输入你的昵称:")
msg = 'L ' + name
sock.sendto(msg.encode(), server_addr)
# 等待服务器的反馈
result, addr = sock.recvfrom(128)
if result == b'OK':
print('您已经进入聊天室')
return name
else:
print("该昵称已存在")
# 聊天
def do_chat(sock, name):
p = Process(target=recv_msg,args=(sock,),daemon=True)
p.start()
send_msg(sock,name)# 发送消息
# 子进程接收
def recv_msg(sock):
while True:
data, addr = sock.recvfrom(1024 * 10)
msg = '\n' + data.decode() + '\n发言'
print(msg, end='')
# 父进程发送
def send_msg(sock, name):
while True:
try:
content = input("发言:")
except KeyboardInterrupt:
content = 'exit'
if content == 'exit':
msg = 'Q ' + name
sock.sendto(msg.encode(), server_addr)
sys.exit("您已经退出聊天室")
msg = "C %s %s" % (name, content)
sock.sendto(msg.encode(), server_addr)
def main():
# 创建套接字
tcp_sock = socket(AF_INET, SOCK_DGRAM)
# 进入聊天室
name = do_login(tcp_sock)
# 聊天
do_chat(tcp_sock,name)
if __name__ == '__main__':
main()
总结:
进程池
必要性:
当我们频繁创建销毁进程的时候,任务简单,量小可以选择进程池
孤儿进程和僵尸进程
孤儿父进程先于子进程退出
子进程由系统分配的新进程作为父进程,子进程退出,这个新进程会处理
僵尸
子进程先于父进程退出父进程没有处理子进程的资源
join回收
忽略子进程的退出
singal
自定义的进程类
定义一个类继承Process
继承父类的构造函数
重写父类的run方法
start
调用run,开启进程
进程间的通信
由于进程的空间相互独立 互不影响
想要获取信息,通过消息队列进行通信
Queue
put()
get()可能会阻塞
网友评论