很多人都不知道自己想要什么,其实是没有勇气面对和足够的努力去争取自己想要的!
总结:
- requests的GET 方法相当于 重新打开浏览器; session 是发起多个会话保持这个cookies;
- 网站允许记住用户登录 :只要里面的 cookies是没有过期的,可以允许在不同的浏览器中 反复免登录,即使你 的账号已经退出了;
- 重点: session 原理; requests.Session().get 和 requests.request('GET') 方法的区别;
- 爬虫:根据你的数据量,再选用第三方工具;
- Event类 事件处理的机制:全局定义了一个“Flag”,当flag值为“False”,那么event.wait()就会阻塞,当flag值为“True”,那么event.wait()便不再阻塞:
- 正确的爬虫思路 :自己定制爬虫的模块化;完全可以达到框架的高度;
模块化思想:分别负责URL的收集、爬取网页、解析数据和存储;在不同的机器,不同的节点上去运行;- if _name_ == 'main'的意思是:当.py文件被直接运行时,if _name_ == '_main'之下的代码块将被运行;当.py文件以模块形式被导入时,if _name == '_main_'之下的代码块不被运行。
- tag = channel.basic_consume_consume/ channel.start_consuming() ( forever 不停止)只有在所有的consumer都cancel 完成后,它才会停止;推荐使用channel.basic_get
- 分散式:分散部署的程序之间无通信;各玩各的;
分布式:分散部署的程序之间有通信;
1. session模拟免登陆oschina(新浪)
一般登录后,用户就可以一段时间内可以使用该用户身份操作,不需要频繁登录了。这背后往往使用了Cookie技术。
登录后,用户获得一个cookie值,这个值在浏览器当前会话中保存,只要不过期甚至可以保存很久。
用户每次向服务器提交请求时,将这些Cookie提交到服务器,服务器经过分析Cookie中的信息,以确认用户身
份,确认是信任的用户身份,就可以继续使用网站功能。
Sessions会话对象让你能够跨请求保持某些参数。它也会在同一个 Session 实例发出的所有请求之间保持 cookie
Cookie
网景公司发明。cookie一般是一个键值对name=value,但还可以包括expire过期时间、path路径、domain域、secure安全等信息。
对比登录前后的cookie值,发现登录后有oscid。
那就把这个HTTP 请求头放在代码中。(或者以登录身份 使用另外一个跳转页的headers)
将登陆后免登录的 requests - cookies,复制其请求信息到 post中;
post神器操作点击POSTMAN中cookies code代码选择Python;
去掉 payload /Postman-Token /requests.request("GET" / 可以得到我们想要的代码;
import requests
url = "https://www.oschina.net/"
# payload = ""
headers = {
'Host': "www.oschina.net",
'Connection': "keep-alive",
'Upgrade-Insecure-Requests': "1",
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36",
'Accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
'Sec-Fetch-Site': "same-origin",
'Sec-Fetch-Mode': "navigate",
'Sec-Fetch-User': "?1",
'Sec-Fetch-Dest': "document",
'Referer': "https://www.oschina.net/project",
'Accept-Encoding': "gzip, deflate, br",
'Accept-Language': "zh-CN,zh;q=0.9",
'Cookie': "_user_behavior_=27bc3099-b9e3-49a6-826b-56aaf822fae5; Hm_lvt_eaa57ca47dacb4ad4f5a257001a3457c=1598520729; Hm_lpvt_eaa57ca47dacb4ad4f5a257001a3457c=1598600848; __gads=ID=d5240dd224818d75:T=1598835624:S=ALNI_MbHQ4_sg1sXdPxkF-c2idfRjucesw; _reg_key_=I84aRYir5tsF8TPSpTTf; yp_riddler_id=9ab2b8fd-9351-4647-a977-eb1dfd58c075; oscid=GJcECObMpGrIDRHg6jyIzTT79fNv7A4Vt%2FZUyPr%2FpUrS5mxAE2K9To66AQ5eVs8GqpmVqe2vR38PlQIfl4QfwO6JJscK6tfltKgaNvjqhC0g6urKhXyw%2FaCEY2GJOSVs21pxmN6HZnxrhwfqZTVm0w%3D%3D; Hm_lvt_a411c4d1664dd70048ee98afe7b28f0b=1598835624,1598835634,1598838223,1598838232; Hm_lpvt_a411c4d1664dd70048ee98afe7b28f0b=1598838308",
'cache-control': "no-cache",
# 'Postman-Token': "af2d9a8e-cd98-4e90-b469-8a78b901bde0"
}
# response = requests.request("GET", url, data=payload, headers=headers)
session = requests.Session()
with session:
response = session.get(url,headers=headers)
text = response.text
print(text[:400])
with open('./oschina.html','w',encoding='utf-8') as f:
f.write(text)
#--------------------------------------------------------
oschina.html
<a class="item logout-btn"><i class="sign out icon grey"></i>退出</a>
有退出说明我们免登录成功了; 判断登录成功的方法有多种;
新浪微博等都一样,只要允许记住用户登录,就可以通过上述方法登录后爬取内容。
2. 多线程爬取博客园
博客园的新闻分页地址https://www.cnblogs.com/#p2, 多线程成批爬取新闻的 标题和链接。
Xpath获取 post_list 部分先爬取一页,试一试效果;
# 先爬取一页
import requests
from bs4 import BeautifulSoup
# https://www.cnblogs.com/lingq/p/13030573.html
# "https://www.cnblogs.com/#p100" page
BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"
headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
}
def gen_urls(start,end,step=1):
for i in range(start,end+1,step):
if i == 1:
url = '{}{}'.format(BASE_URL, NEWSPATH)
else:
url = '{}{}{}'.format(BASE_URL,NEWSPATH,i) # format 凑网址
yield url # 复杂的网址无非 是 求之凑字典;
# for urls in gen_urls(1,1):
# print(urls)
def crawler():
for urls in gen_urls(1,1):
response= requests.request("GET", urls, headers=headers)
text = response.text
soup = BeautifulSoup(text, 'lxml')
news = soup.select('article section div a')
for new in news:
url = new.attrs.get('href')
title = new.text
print(url,title)
crawler()
#--------------------------------------------------------------
https://www.cnblogs.com/youzidexiatian/p/13540629.html Docker 部署 redis教程,附带部分小建议,防止踩坑
https://www.cnblogs.com/vege/p/13591593.html Cobalt strike与内网渗透
https://www.cnblogs.com/ma214617/p/13591556.html 看百度技术专家如何深入研究,重复使用的代码经验——设计模式
https://www.cnblogs.com/zhangjx2457/p/13591491.html 一、MySQL下载和安装
https://www.cnblogs.com/loceaner/p/LGP4343.html 洛谷 P4343 [SHOI2015]自动刷题机
https://www.cnblogs.com/jason0529/p/13591150.html Urule开源版系列4——Core包核心接口之规则解析过程
https://www.cnblogs.com/liutongqing/p/13591093.html 深入理解C++中的new/delete和malloc/free动态内存管理
https://www.cnblogs.com/tencent-cloud-native/p/13591039.html 如何扩展单个Prometheus实现近万Kubernetes集群监控?
https://www.cnblogs.com/lm970585581/p/13590761.html 什么是消息中间件?主要作用是什么?
https://www.cnblogs.com/nlskyfree/p/13590598.html Kafka Broker源码解析一:网络层设计
https://www.cnblogs.com/jackson0714/p/thread_safe_collections.html # 全网最细 | 21张图带你领略集合的线程不安全
https://www.cnblogs.com/csh24/p/13590338.html Java中的静态代理和动态代理
https://www.cnblogs.com/didijishu/p/13590106.html 滴滴数据仓库指标体系建设实践
https://www.cnblogs.com/boboooo/p/13589467.html Spring整合WebSocket
https://www.cnblogs.com/txxunmei/p/13589327.html 由浅入深理解 IOC 和 DI
https://www.cnblogs.com/mayite/p/13589187.html 数据分析与数据挖掘 - 01入门介绍
https://www.cnblogs.com/edison0621/p/13586715.html Blazor带我重玩前端(五)
https://www.cnblogs.com/cplemom/p/13585051.html C# 解析获取Url参数值
https://www.cnblogs.com/Hui4401/p/13588985.html Python协程之asyncio
https://www.cnblogs.com/orange-snow/p/13588851.html dlopen代码详解——从ELF格式到mmap
首先urlq不断的产生URL;抓取函数从里面拿一个urls,抓取需要的html数据到outputs队列中 ;
parse是?:html中有数据,哪一个过来对他进行比较慢的解析;
存贮的事情 交割另一个outputs队列;
# 多线程 解决方案;
import requests, threading
from bs4 import BeautifulSoup
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
# https://www.cnblogs.com/lingq/p/13030573.html
# "https://www.cnblogs.com/#p100" page
BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"
headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
}
# 队列,以后替换成第三方队列;
# 多线程解决方案;
urlq = Queue() # url的 队列
outputs = Queue() # 结果输出 队列
htmls = Queue() # 响应数据 队列
event = threading.Event()
def gen_urls(start, end, step=1):
for i in range(start, end + 1, step):
if i == 1:
url = '{}{}'.format(BASE_URL, NEWSPATH)
else:
url = '{}{}{}'.format(BASE_URL, NEWSPATH, i) # format 凑网址
urlq.put(url)
print(url,'-----------------------------')
# for urls in gen_urls(1,1):
# print(urls)
def crawler():
while not event.wait(1):
try:
urls = urlq.get(True, 1) #
response = requests.request("GET", urls, headers=headers) # urls 需要页面url
text = response.text
htmls.put(text)
except:
print('crawler error')
def parse():
while not event.wait(1):
try:
html = htmls.get(True, 1) #
soup = BeautifulSoup(html, 'lxml')
news = soup.select('article section div a')
for new in news:
# href = new.attr.get('href',None)
# if href:
url = new.attrs.get('href') # 文章url
title = new.text
outputs.put((url, title))
except:
print('parse error')
# 持久化函数;
def persist(path):
with open(path, 'a+', encoding='utf-8') as f:
while not event.is_set():
try:
text = '{}\x01{}\n'.format(*outputs.get(True, 1))
print(text)
f.write(text)
f.flush()
except:
print('persist error')
# 高级异步库 推荐使用
executer = ThreadPoolExecutor(10)
executer.submit(gen_urls,1,1)
executer.submit(crawler)
executer.submit(parse)
executer.submit(persist,'./new.log')
while not event.is_set():
cmd = input('>>>')
if cmd.strip() == 'quit':
break
print(threading.enumerate())
----------------------------------------------------------------------
C:\ProgramData\Miniconda3\envs\blog\python.exe C:/Users/dell/PycharmProjects/spiders/t11.py
# enter看开启的线程 3个 ;
>>>
[<_MainThread(MainThread, started 71120)>, <Thread(ThreadPoolExecutor-0_0, started daemon 71296)>, <Thread(ThreadPoolExecutor-0_1, started daemon 71280)>, <Thread(ThreadPoolExecutor-0_2, started daemon 70892)>, <Thread(ThreadPoolExecutor-0_3, started daemon 17432)>]
>>>
[<_MainThread(MainThread, started 71120)>, <Thread(ThreadPoolExecutor-0_0, started daemon 71296)>, <Thread(ThreadPoolExecutor-0_1, started daemon 71280)>, <Thread(ThreadPoolExecutor-0_2, started daemon 70892)>, <Thread(ThreadPoolExecutor-0_3, started daemon 17432)>]
>>>
#p1爬取成功
https://www.cnblogs.com/txxunmei/p/13592118.html�详细分析链表的数据结构的实现过程(Java 实现)
https://www.cnblogs.com/libolun/p/13592114.html�day41:MYSQL:select查询练习题
https://www.cnblogs.com/snidget/p/13592106.html�如何快速适应新工作?
https://www.cnblogs.com/javadss/p/13584419.html�《Java从入门到失业》第三章:基础语法及基本程序结构(3.7):运算符(自增自减、关系运算、逻辑运算、条件运算、位运算、赋值运算、类型转换)
https://www.cnblogs.com/AllenMaster/p/13589170.html�Azure Storage 系列(一)入门简介
https://www.cnblogs.com/python2/p/13592095.html�AlexNet实现cifar10数据集分类
https://www.cnblogs.com/pythonista/p/13592043.html�Python 到底是强类型语言,还是弱类型语言?
https://www.cnblogs.com/wyq178/p/13520745.html�那些jdk中坑你没商量的方法
https://www.cnblogs.com/xingrenguanxue/p/13591846.html�【Go语言入门系列】(七)如何使用Go的方法?
https://www.cnblogs.com/xiaoqi/p/code-naming-tool.html�来,我们一起打造一款代码命名工具
https://www.cnblogs.com/youzidexiatian/p/13540629.html�Docker 部署 redis教程,附带部分小建议,防止踩坑
https://www.cnblogs.com/vege/p/13591593.html�Cobalt strike与内网渗透
https://www.cnblogs.com/jason0529/p/13591150.html�Urule开源版系列4——Core包核心接口之规则解析过程
https://www.cnblogs.com/liutongqing/p/13591093.html�深入理解C++中的new/delete和malloc/free动态内存管理
https://www.cnblogs.com/tencent-cloud-native/p/13591039.html�如何扩展单个Prometheus实现近万Kubernetes集群监控?
https://www.cnblogs.com/lm970585581/p/13590761.html�什么是消息中间件?主要作用是什么?
https://www.cnblogs.com/nlskyfree/p/13590598.html�Kafka Broker源码解析一:网络层设计
https://www.cnblogs.com/jackson0714/p/thread_safe_collections.html�# 全网最细 | 21张图带你领略集合的线程不安全
https://www.cnblogs.com/csh24/p/13590338.html�Java中的静态代理和动态代理
https://www.cnblogs.com/didijishu/p/13590106.html�滴滴数据仓库指标体系建设实践
https://www.cnblogs.com/txxunmei/p/13592118.html�详细分析链表的数据结构的实现过程(Java 实现)
https://www.cnblogs.com/libolun/p/13592114.html�day41:MYSQL:select查询练习题
https://www.cnblogs.com/snidget/p/13592106.html�如何快速适应新工作?
https://www.cnblogs.com/javadss/p/13584419.html�《Java从入门到失业》第三章:基础语法及基本程序结构(3.7):运算符(自增自减、关系运算、逻辑运算、条件运算、位运算、赋值运算、类型转换)
https://www.cnblogs.com/AllenMaster/p/13589170.html�Azure Storage 系列(一)入门简介
https://www.cnblogs.com/python2/p/13592095.html�AlexNet实现cifar10数据集分类
https://www.cnblogs.com/pythonista/p/13592043.html�Python 到底是强类型语言,还是弱类型语言?
https://www.cnblogs.com/wyq178/p/13520745.html�那些jdk中坑你没商量的方法
https://www.cnblogs.com/xingrenguanxue/p/13591846.html�【Go语言入门系列】(七)如何使用Go的方法?
https://www.cnblogs.com/xiaoqi/p/code-naming-tool.html�来,我们一起打造一款代码命名工具
https://www.cnblogs.com/youzidexiatian/p/13540629.html�Docker 部署 redis教程,附带部分小建议,防止踩坑
https://www.cnblogs.com/vege/p/13591593.html�Cobalt strike与内网渗透
https://www.cnblogs.com/jason0529/p/13591150.html�Urule开源版系列4——Core包核心接口之规则解析过程
https://www.cnblogs.com/liutongqing/p/13591093.html�深入理解C++中的new/delete和malloc/free动态内存管理
https://www.cnblogs.com/tencent-cloud-native/p/13591039.html�如何扩展单个Prometheus实现近万Kubernetes集群监控?
https://www.cnblogs.com/lm970585581/p/13590761.html�什么是消息中间件?主要作用是什么?
https://www.cnblogs.com/nlskyfree/p/13590598.html�Kafka Broker源码解析一:网络层设计
https://www.cnblogs.com/jackson0714/p/thread_safe_collections.html�# 全网最细 | 21张图带你领略集合的线程不安全
https://www.cnblogs.com/csh24/p/13590338.html�Java中的静态代理和动态代理
https://www.cnblogs.com/didijishu/p/13590106.html�滴滴数据仓库指标体系建设实践
解析内容是一个比较耗时的过程,不适合放在crawler中同步处理。同样使用队列解耦。
现在线程都是拿一条数据,执行完就结束了。修改为可以不停的从队列中取数据。
# 10个线程全部用上;
import requests, threading, time
from bs4 import BeautifulSoup
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"
headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
}
# 队列,以后替换成第三方队列;
# 多线程解决方案;
urls = Queue() # url的 队列
outputs = Queue() # 结果输出 队列
htmls = Queue() # 响应数据 队列
event = threading.Event()
def gen_urls(start, end, step=1):
for i in range(start, end + 1, step):
url = '{}{}{}'.format(BASE_URL, NEWSPATH, i) # format 凑网址
urls.put(url)
def crawler():
while not event.is_set():
try:
url = urls.get(True, 1) #
print(url)
response = requests.request("GET", url, headers=headers) # urls 需要页面url
html = response.text
soup = BeautifulSoup(html, 'lxml')
news = soup.select('article section div a')
print(news)
# print(html)
htmls.put(html)
except:
print('crawler error')
def parse():
while not event.is_set():
try:
html = htmls.get(True, 1) #
soup = BeautifulSoup(html, 'lxml')
news = soup.select('article section div a')
for new in news:
# href = new.attr.get('href',None)
# if href:
url = new.attrs.get('href') # 文章url
title = new.text
outputs.put((url, title))
except:
print('parse error')
# 持久化函数;
def persist(path):
with open(path, 'a+', encoding='utf-8') as f:
while not event.is_set():
try:
text = '{}\x01{}\n'.format(*outputs.get(True, 1))
# print(text)
f.write(text)
f.flush()
except:
print('persist error')
# 高级异步库 推荐使用
executer = ThreadPoolExecutor(10)
executer.submit(gen_urls,1,2)
for _ in range(4):
executer.submit(crawler)
for _ in range(5):
executer.submit(parse)
executer.submit(persist,'./new.log')
while not event.is_set():
cmd = input('>>>')
if cmd.strip() == 'quit':
break
print(threading.enumerate())
进阶(消息队列)
将队列换成第三方服务,本次采用较为常用RabbitMQ。
搭建RabbitMQ服务,此过程略去,参看相关课件。
1. 队列工作模式选择
以爬虫程序的htmls队列为例,这个队列有多个生产者(爬取函数)写入,有多个消费者(解析函数)读取。每一
个消息只需要一个消费者使用。所以,采用RabbitMQ的工作队列模式。
RabbitMQ生产者、消费者两端都可以创建交换机、队列。
队列中如何如何分发呢?
其实说到底都是路由,RabbitMQ的队列和工作队列,其实都是路由模式,只不过使用了缺省交换机。
队列是否断开删除
每一数据都要处理,不能因为某一端断开,然后队列就删除了,造成数据丢失
测试代码
# consumer.py 消费者代码
import pika
en = 'news' # 交换机 # 常量放到前面去;
qns = ('urls','htmls','outputs')
params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)
channel = connection.channel() # 建立rabbit协议的通道
channel.exchange_declare(
exchange=en, # 交换机
exchange_type='direct' # 路由
)
# fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
# direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
# topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)
# 生成一个新的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
q1 = channel.queue_declare(qns[0], exclusive=False) # 消费者一般不能断开就删除,
q2 = channel.queue_declare(qns[1], exclusive=False)
q3 = channel.queue_declare(qns[2], exclusive=False)
# binding 告诉exchange将message发送该哪些queue
channel.queue_bind(exchange=en,queue=qns[0]) # routing_key默认为en
channel.queue_bind(exchange=en,queue=qns[1])
channel.queue_bind(exchange=en,queue=qns[2])
# channel.queue_bind(exchange=en,queue=name2,routing_key=topic[2])
def callback(channel,method,properties,body):
print(body)
with connection: # 多个消费者tag1/2
tag1 = channel.basic_consume(qns[1],callback,True) # 可以消费数据 ,没有值就阻塞住;
tag2 = channel.basic_consume(qns[1],callback,True) # 可以消费数据
channel.start_consuming() # forever 不停止
print('receive meessage ok')
route_key 默认为queue名称 en
# send.py 生产者代码
20条数据给qns[0]=urls, 但2个consumer(tag1/2) 在 htmls处排队 消费;所以数据还在;
20条数据
2. 重构消息队列类
上面的多线程代码,写的不好,大量使用了全局变量,只是为了说明问题;
rebbitmq:15672 中 删除所有队列 和 news ;# producer.py 生产者代码
import pika, random
class Producer:
def __init__(self, url, exchangename, queuename):
params = pika.URLParameters(url)
# params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
self.channel.exchange_declare( # 创建交换机;
exchange=exchangename, # 交换机
exchange_type='direct' # 路由
)
self.exchangename = exchangename
self.queuename = queuename
self.channel.queue_declare(queuename, exclusive=False)
self.channel.queue_bind(queuename, exchangename)
def sendmsg(self,msg:str):
self.channel.basic_publish(
exchange=self.exchangename,
routing_key=self.queuename, # routing_key 不设置,就是用queue的名称;
body=msg # 消息
)
if __name__ == '__main__':
p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
for i in range(40):
p1.sendmsg('data-{}'.format(i))
-----------------------------------------------------------------
# consumer.py 消费者代码
import pika, time
class Consumer:
def __init__(self, url, exchangename, queuename):
params = pika.URLParameters(url)
# params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
self.channel.exchange_declare( # 创建交换机;
exchange=exchangename, # 交换机
exchange_type='direct' # 路由
)
self.exchangename = exchangename
self.queuename = queuename
self.channel.queue_declare(queuename, exclusive=False)
self.channel.queue_bind(queuename, exchangename)
def recvmsg(self):
# _, _, body = self.channel.basic_get('hello', True)
# print(body)
return self.channel.basic_get(self.queuename, True)[2] # 三元组
if __name__ == '__main__':
c1 = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
for i in range(10): # 拿5个
msg = c1.recvmsg()
if msg:
print(msg)
else:
time.sleep(1)
print('sleep 1')
--------------------------------------------------
C:\ProgramData\Miniconda3\envs\blog\python.exe C:/Users/dell/PycharmProjects/spiders/consumer.py
b'data-6'
b'data-7'
b'data-8'
b'data-9'
b'data-10'
b'data-11'
b'data-12'
b'data-13'
b'data-14'
b'data-15'
consumer和productor代码相似性很高;能不能整合一下呢?
# messagequeue.py MessageQueue代码
import pika, random
class MessageQueue:
def __init__(self, url, exchangename, queuename):
params = pika.URLParameters(url)
# params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
self.channel.exchange_declare( # 创建交换机;
exchange=exchangename, # 交换机
exchange_type='direct' # 路由
)
self.exchangename = exchangename
self.queuename = queuename
self.channel.queue_declare(queuename, exclusive=False)
self.channel.queue_bind(queuename, exchangename)
class Producer(MessageQueue):
def sendmsg(self,msg:str):
self.channel.basic_publish(
exchange=self.exchangename,
routing_key=self.queuename, # routing_key 不设置,就是用queue的名称;
body=msg # 消息
)
class Consumer(MessageQueue):
def recvmsg(self):
return self.channel.basic_get(self.queuename, True)[2] # 三元组
# ----------------------------------------没有完善;
if __name__ == '__main__':
p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
for i in range(40):
p1.sendmsg('data-{}'.format(i))
总结:tag = channel.basic_consume_consume/ channel.start_consuming() ( forever 不停止)只有在所有的consumer都cancel 完成后,它才会停止;推荐使用channel.basic_get
抽取 基类
功能和部署分离爬虫方案
# 分散式爬虫方案
# spider.py
import requests, threading, time, simplejson
from bs4 import BeautifulSoup
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from messagequeue import Producer,Consumer
BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"
headers = {
'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
}
# 队列,以后替换成第三方队列;
# urls = Queue() # url的 队列
# outputs = Queue() # 结果输出 队列
# htmls = Queue() # 响应数据 队列
event = threading.Event()
def gen_urls(start, end, step=1):
p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'urls')
for i in range(start, end + 1, step):
url = '{}{}{}'.format(BASE_URL, NEWSPATH, i) # format 凑网址
if url:
p.sendmsg(url)
print(url)
else:
print(url,type(url))
def crawler():
try:
c = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'urls') # 可以做常量:可以做成配置文件;
p = Producer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'htmls')
while not event.wait(1):
#url = urls.get(True, 1) #
url = c.recvmsg() # 拿一个url 数据
if url:
# with requests.request("GET", url, headers=headers) as response: # urls 需要页面url
response = requests.request("GET", url, headers=headers) # urls 需要页面url
text = response.text
# htmls.put(text)
p.sendmsg(text)
else:
pass
# print('crawder sendmsg error') # URL=None 等一秒再拿;
except Exception as e:
print('e','1111')
def parse():
try:
c = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'htmls')
p = Producer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'outputs')
while not event.is_set(): # event.is_set() 默认阻塞行为;
#html = htmls.get(True, 1) #
html = c.recvmsg()
if html:
soup = BeautifulSoup(html, 'lxml')
news = soup.select('article section div a')
for new in news:
# href = new.attr.get('href',None)
# if href:
url = new.attrs.get('href') # 文章url
title = new.text
# outputs.put((url, title))
# p.sendmsg("\x01".join((url, title)))
p.sendmsg(simplejson.dumps({'url':url,'title':title})) # simpleJson 文本序列化
else:
event.wait(1)
except Exception as e:
print('e', '2222')
# 持久化函数;
def persist(path):
try:
c = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'outputs')
with open(path, 'a+', encoding='utf-8') as f:
while not event.is_set(): # 只要有数据就尽可能的快;直接通过
try:
# text = '{}\x01{}\n'.format(*outputs.get(True, 1))
data = c.recvmsg()
if data:
data = simplejson.loads(data)
text = '{}\x01{}\n'.format(data['url'],data['title']) # 队列为空不能split?
print(text) # 打印数据
f.write(text)
f.flush()
else:
event.wait(1) # 只要有数据就尽可能的快,只要没数据等一秒再玩;
except Exception as e:
raise
except Exception as e:
print('e', '3333')
# 高级异步库 推荐使用
executer = ThreadPoolExecutor(10)
executer.submit(gen_urls, 1, 50)
for _ in range(4):
executer.submit(crawler)
for _ in range(5):
executer.submit(parse)
executer.submit(persist, './new.log')
while not event.is_set():
cmd = input('>>>')
if cmd.strip() == 'quit':
break
print(threading.enumerate())
# messagequeue.py
# producer.py 生产者代码
import pika, random
class MessageQueue:
def __init__(self, url, exchangename, queuename):
params = pika.URLParameters(url)
# params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
self.channel.exchange_declare( # 创建交换机;
exchange=exchangename, # 交换机
exchange_type='direct' # 路由
)
self.exchangename = exchangename
self.queuename = queuename
self.channel.queue_declare(queuename, exclusive=False)
self.channel.queue_bind(queuename, exchangename)
class Producer(MessageQueue):
def sendmsg(self,msg:str):
self.channel.basic_publish(
exchange=self.exchangename,
routing_key=self.queuename, # routing_key 不设置,就是用queue的名称;
body=msg # 消息
)
class Consumer(MessageQueue):
def recvmsg(self):
return self.channel.basic_get(self.queuename, True)[2] # 三元组
if __name__ == '__main__':
p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
for i in range(40):
p1.sendmsg('data-{}'.format(i))
三个队列各玩各的
网友评论