第三天:多任务编程
目标: 完成校花图片下载 http://www.521609.com/qingchunmeinv/
进阶知识点: https://blog.csdn.net/ahhqdyh/article/details/106817492
B站视频资源: https://search.bilibili.com/all?keyword=disenQF&from_source=nav_search_new
3.1 多任务的认知
多任务: 一个程序存在多个并行或串行执行的任务(线程、进程或协程)。
3.1.1 并行与串行
并行: 两个以上的任务在同时运行。并行运行的任务,一般通过进程来实现。
串行: 两个以上的任务依次运行。
3.1.2 同步与异步
任务之间在调用时, 存在同步和异步的问题。
同步: 任务A 调用任务B时,任务A必须等待B任务完成后,才能继续向下执行。
异步: 任务A调用任务B时,可以向B提供一个回调函数,无须等待B任务完成,A可以继续向下执行。当B任务完成后,通过A提供的回调函数,将B任务执行完成的数据回传给A任务。 一般都是基于线程实现异步场景,在HTML的页面中,可以使用ajax实现异步请求。
3.1.3 高并发
站点服务器,在某一时间段内,接收客户端的海量请求(C10K),这一场景称之为高并发。
高并发解决方案: 后台进程+消息队列
后台进程
- 自定义进程服务
- Celery 第三方框架
消息队列
- Redis
- RabbitMQ
- Kafka
3.2 多进程和多线程的关系
进程:为了启动一个程序(式)或软件,系统根据程序的大小来分配内存空间,同时创建进程控制块来描述进程的状态和相关信息(进程ID、所属父进程ID、内存情况等)。当程序所用的资源就绪之后,会创建一个线程来运行程序代码,这个线程称之为主线程(MainThread)。
进程和线程的关系:
- 进程创建时,系统分配内存和创建线程(主线程)及进程控制块。 线程是在进程中运行的。
- 每个进程创建时,都具有独立的内存空间,因此进程之间互不影响。
- 线程在进程中创建,多个线程共享所属进程的内存空间。
- 创建进程时同时创建线程称之为主线程,在主线程中创建的线程,称之为子线程。
在Python中进程模块是multiprocessing
, 线程模块是threading
.
# 进程如何使用:
# 1) 通过Process进程类启动已实现的函数(目标)
# 2) 编写Process类的子类,重写run方法,在run方法中开发进程中执行的程序。
def sum_n(n, e=None):
s= 0
for i in range(1, n+1):
if e and i % e == 0:
continue
s += i
print(n, '以内的所有数据和为', s)
def p():
# 快捷键: ctrl+p 显示方法的参数
# 创建三个进程(子进程), 当前所在的进程是父进程
process1 = multiprocessing.Process(target=sum_n, args=(100,))
process2 = multiprocessing.Process(target=sum_n, args=(1000, 5)) # ctrl+d 快速复制一行
process3 = multiprocessing.Process(target=sum_n, kwargs=dict(n=10000, e=2))
# 启动三个进程
process1.start()
process2.start()
process3.start()
# 等待进程执行完
process1.join()
process2.join()
process3.join()
process_name = multiprocessing.current_process().name # 获取当前进程的名称
print(process_name, '--主进程 Over---')
#!/usr/bin/python3
# coding: utf-8
from multiprocessing import Process
import time
class SumProcess(Process):
def __init__(self, n, e=None):
super().__init__() # 调用父类的初始化方法: 一定写,因类进程的创建由系统,初始化方法由系统调用。
self.n = n
self.e = e
def run(self):
# 当前进程分配资源后,由创建的线程来执行的函数,系统调用的
s = 0
for i in range(1, self.n + 1):
if self.e and i % self.e == 0:
continue
s += i
time.sleep(0.001)
print(self.name, i)
print(self.name, self.n, '以内的所有数据和为', s)
if __name__ == '__main__':
# 创建进程
ps = [ SumProcess(n, e) for n, e in [(100, None), (1000, 5), (10000, 2)] ]
# 启动进程
for p in ps:
p.start()
# p.join() # 三个进程执行的方式变成了串行,注释之后,变成了并行
# 等待进程完成
for p in ps:
p.join()
print('--Over--')
#!/usr/bin/python3
# coding: utf-8
import threading
import multiprocessing
def sum_n(n, e=None):
s= 0
for i in range(1, n+1):
if e and i % e == 0:
continue
s += i
name = threading.current_thread().name # 当前线程的名称
pname = multiprocessing.current_process().name
print(pname, name, n, '以内的所有数据和为', s)
def t():
# 快捷键: ctrl+p 显示方法的参数
# 创建三个进程(子进程), 当前所在的进程是父进程
thread1 = threading.Thread(target=sum_n, args=(100,))
thread2 = threading.Thread(target=sum_n, args=(1000, 5)) # ctrl+d 快速复制一行
thread3 = threading.Thread(target=sum_n, kwargs=dict(n=10000, e=2))
# 启动三个进程
thread1.start()
thread2.start()
thread3.start()
# 等待进程执行完
thread1.join()
thread2.join()
thread3.join()
name = threading.current_thread().name
pname = multiprocessing.current_process().name
print(pname, name, '-- Over---')
if __name__ == '__main__':
# 计算 100以内的和
# 计算 1000以内的和, 排除5的倍数
# 计算 10000以内的和, 排除2的位数
# 线程的方式
t()
#!/usr/bin/python3
# coding: utf-8
from threading import Thread
import time
class SumThread(Thread):
def __init__(self, n, e=None):
super().__init__() # 调用父类的初始化方法: 一定写,因类进程的创建由系统,初始化方法由系统调用。
self.n = n
self.e = e
def run(self):
# 当前进程分配资源后,由创建的线程来执行的函数,系统调用的
s = 0
for i in range(1, self.n + 1):
if self.e and i % self.e == 0:
continue
s += i
time.sleep(0.001)
print(self.name, i)
print(self.name, self.n, '以内的所有数据和为', s)
if __name__ == '__main__':
# 创建进程
ts = [ SumThread(n, e) for n, e in [(100, None), (1000, 5), (10000, 2)] ]
# 启动进程
for p in ts:
p.start()
# p.join() # 三个进程执行的方式变成了串行,注释之后,变成了并行
# 等待进程完成
for p in ts:
p.join()
print('--Over--')
3.3 多进程应用:爬虫服务
# 设计思想: 将写好的爬虫程序封装到后台进程中,通过进程队列接收命令,每一个命令对应是不同的爬虫程序。
# 实现: 一个爬虫服务端进程、一个爬虫客户端进程
# 爬虫服务端的进程类
#!/usr/bin/python3
# coding: utf-8
from multiprocessing import Process, Queue
import requests
class SpiderServerProcess(Process):
def __init__(self, q: Queue):
super(SpiderServerProcess, self).__init__(name='SpiderServer')
self.q = q
def ip_spider(self):
url = 'https://www.kuaidaili.com/free' # get请求
ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0'
referer = 'https://www.kuaidaili.com/free'
resp = requests.get(url, headers={'User-Agent': ua, 'Referer': referer})
if resp.status_code == 200:
with open('ip.html', 'wb') as f:
f.write(resp.content)
print('ip 下载成功')
def sug(self,word):
url = 'https://fanyi.baidu.com/sug' # post请求
# form表单的参数 wd,参数值是任意的英文单词
data = {
'kw': word
}
# 请求头的User-Agent
ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0'
# 发起了post请求
resp = requests.post(url, data=data, headers={'User-Agent': ua})
if resp.status_code == 200:
ret = resp.json() # 文本是json格式, json.loads(resp.text)
if ret['errno'] == 0:
for item in ret['data']:
print(item['k'], item['v'])
else:
print(ret['errmsg'])
def run(self):
while True:
# 如果timeout超时,则会抛出异常
# 在Ptython中使用try-except语句捕获异常
try:
cmd = self.q.get(timeout=120) # 从队列中获取指令, block为True是表示等待从Queue中可以获取一个元素
print(self.name, cmd) # ip / sug apple
if cmd.startswith('ip'): # ip
self.ip_spider()
elif cmd.startswith('sug'):
word = cmd.split()[-1]
self.sug(word)
except:
break
print('SpiderServer Closeed!')
class SpiderClientProcess(Process):
def __init__(self, q: Queue):
super().__init__(name="SpiderClient")
self.q = q
def run(self):
while True:
cmd = input("SpiderClient:") # 子进程不建议使用input函数获取键盘输入的内容
if cmd == 'exit':
break
self.q.put(cmd)
print('SpiderClient Exit OK!')
if __name__ == '__main__':
# 启动Server进程, 读取Client送的指令
# 启动Client进程, 向Server进程发送指令
# 基于进程队列Queue实现两个进程之间的通信
cmd_queue = Queue()
server = SpiderServerProcess(cmd_queue)
# client = SpiderClientProcess(cmd_queue)
server.start()
while True:
cmd = input("\nSpiderClient>>>")
if cmd == 'exit':
break
cmd_queue.put(cmd)
print('SpiderClient Exit OK!')
server.join()
# 基于Socket实现进程间通信的网络编程
# server
#!/usr/bin/python3
# coding: utf-8
from multiprocessing import Process
from threading import Thread
import socket
import requests
class SpiderServerThread(Thread):
def __init__(self, client):
super(SpiderServerThread, self).__init__()
self.client = client
def send_msg(self, msg):
self.client.send(msg.encode('utf-8'))
def ip_spider(self):
url = 'https://www.kuaidaili.com/free' # get请求
ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0'
referer = 'https://www.kuaidaili.com/free'
resp = requests.get(url, headers={'User-Agent': ua, 'Referer': referer})
if resp.status_code == 200:
with open('ip.html', 'wb') as f:
f.write(resp.content)
self.send_msg('ip 下载成功')
def sug(self,word):
url = 'https://fanyi.baidu.com/sug' # post请求
# form表单的参数 wd,参数值是任意的英文单词
data = {
'kw': word
}
# 请求头的User-Agent
ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0'
# 发起了post请求
resp = requests.post(url, data=data, headers={'User-Agent': ua})
if resp.status_code == 200:
ret = resp.json() # 文本是json格式, json.loads(resp.text)
if ret['errno'] == 0:
for item in ret['data']:
self.send_msg('%s,%s' % (item['k'], item['v']))
else:
self.send_msg(ret['errmsg'])
def run(self):
while True:
self.client.send('连接成功'.encode('utf-8')) # 向客户端发送消息
cmd = self.client.recv(1024*8)
cmd_str = cmd.decode('utf-8')
if cmd_str == 'exit':
break
elif cmd_str.startswith('ip'):
self.ip_spider()
elif cmd_str.startswith('sug'):
word = cmd_str.split()[-1]
self.sug(word)
class SpiderServerProcess(Process):
def __init__(self, ):
super(SpiderServerProcess, self).__init__(name='SpiderServer')
self.server = socket.socket() # 网络套接字 , 所有的网络编程的基础
self.server.bind(('localhost', 8000))
self.server.listen()
def run(self):
print('Server Spider Running OK')
while True:
client, address = self.server.accept() # 等待客户端的连接
print(address, '已连接')
SpiderServerThread(client).start()
print('SpiderServer Closeed!')
if __name__ == '__main__':
# 启动Server进程, 读取Client送的指令
# 启动Client进程, 向Server进程发送指令
# 基于进程队列Queue实现两个进程之间的通信
server = SpiderServerProcess()
# client = SpiderClientProcess(cmd_queue)
server.start()
server.join()
# client
#!/usr/bin/python3
# coding: utf-8
import socket
if __name__ == '__main__':
client = socket.socket()
client.connect(('localhost', 8000))
msg = client.recv(1024*8)
print(msg.decode())
while True:
cmd = input("\nSpiderClient>>>")
client.send(cmd.encode()) # 向服务端发送指令
if cmd == 'exit':
break
msg = client.recv(1024*8)
print(msg.decode('utf-8'))
print('SpiderClient Exit OK!')
网友评论