美文网首页
64.5-模拟登陆和多线程异步爬虫

64.5-模拟登陆和多线程异步爬虫

作者: BeautifulSoulpy | 来源:发表于2020-09-01 17:41 被阅读0次

很多人都不知道自己想要什么,其实是没有勇气面对和足够的努力去争取自己想要的!


总结:

  1. requests的GET 方法相当于 重新打开浏览器; session 是发起多个会话保持这个cookies;
  2. 网站允许记住用户登录 :只要里面的 cookies是没有过期的,可以允许在不同的浏览器中 反复免登录,即使你 的账号已经退出了
  3. 重点: session 原理; requests.Session().get 和 requests.request('GET') 方法的区别;
  4. 爬虫:根据你的数据量,再选用第三方工具;
  5. Event类 事件处理的机制:全局定义了一个“Flag”,当flag值为“False”,那么event.wait()就会阻塞,当flag值为“True”,那么event.wait()便不再阻塞:
  6. 正确的爬虫思路 :自己定制爬虫的模块化;完全可以达到框架的高度;
    模块化思想:分别负责URL的收集、爬取网页、解析数据和存储;在不同的机器,不同的节点上去运行;
  7. if _name_ == 'main'的意思是:当.py文件被直接运行时,if _name_ == '_main'之下的代码块将被运行;当.py文件以模块形式被导入时,if _name == '_main_'之下的代码块不被运行。
  8. tag = channel.basic_consume_consume/ channel.start_consuming() ( forever 不停止)只有在所有的consumer都cancel 完成后,它才会停止;推荐使用channel.basic_get
  9. 分散式:分散部署的程序之间无通信;各玩各的;
    分布式:分散部署的程序之间有通信;

1. session模拟免登陆oschina(新浪)

一般登录后,用户就可以一段时间内可以使用该用户身份操作,不需要频繁登录了。这背后往往使用了Cookie技术

登录后,用户获得一个cookie值,这个值在浏览器当前会话中保存,只要不过期甚至可以保存很久。
用户每次向服务器提交请求时,将这些Cookie提交到服务器,服务器经过分析Cookie中的信息,以确认用户身
份,确认是信任的用户身份,就可以继续使用网站功能。

Sessions会话对象让你能够跨请求保持某些参数。它也会在同一个 Session 实例发出的所有请求之间保持 cookie

Cookie

网景公司发明。cookie一般是一个键值对name=value,但还可以包括expire过期时间、path路径、domain域、secure安全等信息。

对比登录前后的cookie值,发现登录后有oscid
那就把这个HTTP 请求头放在代码中。(或者以登录身份 使用另外一个跳转页的headers)

将登陆后免登录的 requests - cookies,复制其请求信息到 post中;

post神器操作

点击POSTMAN中cookies code代码选择Python;
去掉 payload /Postman-Token /requests.request("GET" / 可以得到我们想要的代码;

import requests

url = "https://www.oschina.net/"

# payload = ""
headers = {
    'Host': "www.oschina.net",
    'Connection': "keep-alive",
    'Upgrade-Insecure-Requests': "1",
    'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36",
    'Accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    'Sec-Fetch-Site': "same-origin",
    'Sec-Fetch-Mode': "navigate",
    'Sec-Fetch-User': "?1",
    'Sec-Fetch-Dest': "document",
    'Referer': "https://www.oschina.net/project",
    'Accept-Encoding': "gzip, deflate, br",
    'Accept-Language': "zh-CN,zh;q=0.9",
    'Cookie': "_user_behavior_=27bc3099-b9e3-49a6-826b-56aaf822fae5; Hm_lvt_eaa57ca47dacb4ad4f5a257001a3457c=1598520729; Hm_lpvt_eaa57ca47dacb4ad4f5a257001a3457c=1598600848; __gads=ID=d5240dd224818d75:T=1598835624:S=ALNI_MbHQ4_sg1sXdPxkF-c2idfRjucesw; _reg_key_=I84aRYir5tsF8TPSpTTf; yp_riddler_id=9ab2b8fd-9351-4647-a977-eb1dfd58c075; oscid=GJcECObMpGrIDRHg6jyIzTT79fNv7A4Vt%2FZUyPr%2FpUrS5mxAE2K9To66AQ5eVs8GqpmVqe2vR38PlQIfl4QfwO6JJscK6tfltKgaNvjqhC0g6urKhXyw%2FaCEY2GJOSVs21pxmN6HZnxrhwfqZTVm0w%3D%3D; Hm_lvt_a411c4d1664dd70048ee98afe7b28f0b=1598835624,1598835634,1598838223,1598838232; Hm_lpvt_a411c4d1664dd70048ee98afe7b28f0b=1598838308",
    'cache-control': "no-cache",
    # 'Postman-Token': "af2d9a8e-cd98-4e90-b469-8a78b901bde0"
    }

# response = requests.request("GET", url, data=payload, headers=headers)
session = requests.Session()

with session:
    response = session.get(url,headers=headers)
    text = response.text
    print(text[:400])

    with open('./oschina.html','w',encoding='utf-8') as f:
        f.write(text)
#--------------------------------------------------------
oschina.html
<a class="item logout-btn"><i class="sign out icon grey"></i>退出</a>

退出说明我们免登录成功了; 判断登录成功的方法有多种;

新浪微博等都一样,只要允许记住用户登录,就可以通过上述方法登录后爬取内容。

2. 多线程爬取博客园

博客园的新闻分页地址https://www.cnblogs.com/#p2, 多线程成批爬取新闻的 标题和链接

Xpath获取 post_list 部分

先爬取一页,试一试效果;

# 先爬取一页
import requests
from bs4 import BeautifulSoup

# https://www.cnblogs.com/lingq/p/13030573.html
# "https://www.cnblogs.com/#p100" page
BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"

headers = {
    'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
    }

def gen_urls(start,end,step=1):
    for i in range(start,end+1,step):
        if i == 1:
            url = '{}{}'.format(BASE_URL, NEWSPATH)
        else:
            url = '{}{}{}'.format(BASE_URL,NEWSPATH,i)    # format 凑网址
        yield url        # 复杂的网址无非 是 求之凑字典;

# for urls in gen_urls(1,1):
#     print(urls)


def crawler():
    for urls in gen_urls(1,1):
        response= requests.request("GET", urls, headers=headers)
        text = response.text

        soup = BeautifulSoup(text, 'lxml')
        news = soup.select('article section div a')

        for new in news:
            url = new.attrs.get('href')
            title = new.text
            print(url,title)

crawler()
#--------------------------------------------------------------
https://www.cnblogs.com/youzidexiatian/p/13540629.html Docker 部署 redis教程,附带部分小建议,防止踩坑
https://www.cnblogs.com/vege/p/13591593.html Cobalt strike与内网渗透
https://www.cnblogs.com/ma214617/p/13591556.html 看百度技术专家如何深入研究,重复使用的代码经验——设计模式
https://www.cnblogs.com/zhangjx2457/p/13591491.html 一、MySQL下载和安装
https://www.cnblogs.com/loceaner/p/LGP4343.html 洛谷 P4343 [SHOI2015]自动刷题机
https://www.cnblogs.com/jason0529/p/13591150.html Urule开源版系列4——Core包核心接口之规则解析过程
https://www.cnblogs.com/liutongqing/p/13591093.html 深入理解C++中的new/delete和malloc/free动态内存管理
https://www.cnblogs.com/tencent-cloud-native/p/13591039.html 如何扩展单个Prometheus实现近万Kubernetes集群监控?
https://www.cnblogs.com/lm970585581/p/13590761.html 什么是消息中间件?主要作用是什么?
https://www.cnblogs.com/nlskyfree/p/13590598.html Kafka Broker源码解析一:网络层设计
https://www.cnblogs.com/jackson0714/p/thread_safe_collections.html # 全网最细 | 21张图带你领略集合的线程不安全
https://www.cnblogs.com/csh24/p/13590338.html Java中的静态代理和动态代理
https://www.cnblogs.com/didijishu/p/13590106.html 滴滴数据仓库指标体系建设实践
https://www.cnblogs.com/boboooo/p/13589467.html Spring整合WebSocket
https://www.cnblogs.com/txxunmei/p/13589327.html 由浅入深理解 IOC 和 DI
https://www.cnblogs.com/mayite/p/13589187.html 数据分析与数据挖掘 - 01入门介绍
https://www.cnblogs.com/edison0621/p/13586715.html Blazor带我重玩前端(五)
https://www.cnblogs.com/cplemom/p/13585051.html C# 解析获取Url参数值
https://www.cnblogs.com/Hui4401/p/13588985.html Python协程之asyncio
https://www.cnblogs.com/orange-snow/p/13588851.html dlopen代码详解——从ELF格式到mmap

首先urlq不断的产生URL;抓取函数从里面拿一个urls,抓取需要的html数据到outputs队列中 ;
parse是?:html中有数据,哪一个过来对他进行比较慢的解析;
存贮的事情 交割另一个outputs队列;

# 多线程 解决方案;
import requests, threading
from bs4 import BeautifulSoup
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

# https://www.cnblogs.com/lingq/p/13030573.html
# "https://www.cnblogs.com/#p100" page
BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"

headers = {
    'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
}

# 队列,以后替换成第三方队列;
# 多线程解决方案;
urlq = Queue()  # url的   队列
outputs = Queue()  # 结果输出 队列
htmls = Queue()  # 响应数据 队列

event = threading.Event()


def gen_urls(start, end, step=1):
    for i in range(start, end + 1, step):
        if i == 1:
            url = '{}{}'.format(BASE_URL, NEWSPATH)
        else:
            url = '{}{}{}'.format(BASE_URL, NEWSPATH, i)  # format 凑网址
        urlq.put(url)
        print(url,'-----------------------------')


# for urls in gen_urls(1,1):
#     print(urls)

def crawler():
    while not event.wait(1):
        try:
            urls = urlq.get(True, 1)  #
            response = requests.request("GET", urls, headers=headers)  # urls 需要页面url
            text = response.text
            htmls.put(text)


        except:
            print('crawler error')


def parse():
    while not event.wait(1):
        try:
            html = htmls.get(True, 1)  #
            soup = BeautifulSoup(html, 'lxml')
            news = soup.select('article section div a')

            for new in news:
                # href = new.attr.get('href',None)
                # if href:
                url = new.attrs.get('href')  # 文章url
                title = new.text
                outputs.put((url, title))
        except:
            print('parse error')

# 持久化函数;
def persist(path):
    with open(path, 'a+', encoding='utf-8') as f:
        while not event.is_set():
            try:
                text = '{}\x01{}\n'.format(*outputs.get(True, 1))
                print(text)
                f.write(text)
                f.flush()
            except:
                print('persist error')

# 高级异步库 推荐使用
executer = ThreadPoolExecutor(10)

executer.submit(gen_urls,1,1)
executer.submit(crawler)
executer.submit(parse)
executer.submit(persist,'./new.log')

while not event.is_set():
    cmd = input('>>>')
    if cmd.strip() == 'quit':
        break
    print(threading.enumerate())
----------------------------------------------------------------------
C:\ProgramData\Miniconda3\envs\blog\python.exe C:/Users/dell/PycharmProjects/spiders/t11.py
# enter看开启的线程 3个 ;

>>>
[<_MainThread(MainThread, started 71120)>, <Thread(ThreadPoolExecutor-0_0, started daemon 71296)>, <Thread(ThreadPoolExecutor-0_1, started daemon 71280)>, <Thread(ThreadPoolExecutor-0_2, started daemon 70892)>, <Thread(ThreadPoolExecutor-0_3, started daemon 17432)>]
>>>
[<_MainThread(MainThread, started 71120)>, <Thread(ThreadPoolExecutor-0_0, started daemon 71296)>, <Thread(ThreadPoolExecutor-0_1, started daemon 71280)>, <Thread(ThreadPoolExecutor-0_2, started daemon 70892)>, <Thread(ThreadPoolExecutor-0_3, started daemon 17432)>]
>>>
#p1爬取成功
https://www.cnblogs.com/txxunmei/p/13592118.html�详细分析链表的数据结构的实现过程(Java 实现)
https://www.cnblogs.com/libolun/p/13592114.html�day41:MYSQL:select查询练习题
https://www.cnblogs.com/snidget/p/13592106.html�如何快速适应新工作?
https://www.cnblogs.com/javadss/p/13584419.html�《Java从入门到失业》第三章:基础语法及基本程序结构(3.7):运算符(自增自减、关系运算、逻辑运算、条件运算、位运算、赋值运算、类型转换)
https://www.cnblogs.com/AllenMaster/p/13589170.html�Azure Storage 系列(一)入门简介
https://www.cnblogs.com/python2/p/13592095.html�AlexNet实现cifar10数据集分类
https://www.cnblogs.com/pythonista/p/13592043.html�Python 到底是强类型语言,还是弱类型语言?
https://www.cnblogs.com/wyq178/p/13520745.html�那些jdk中坑你没商量的方法
https://www.cnblogs.com/xingrenguanxue/p/13591846.html�【Go语言入门系列】(七)如何使用Go的方法?
https://www.cnblogs.com/xiaoqi/p/code-naming-tool.html�来,我们一起打造一款代码命名工具
https://www.cnblogs.com/youzidexiatian/p/13540629.html�Docker 部署 redis教程,附带部分小建议,防止踩坑
https://www.cnblogs.com/vege/p/13591593.html�Cobalt strike与内网渗透
https://www.cnblogs.com/jason0529/p/13591150.html�Urule开源版系列4——Core包核心接口之规则解析过程
https://www.cnblogs.com/liutongqing/p/13591093.html�深入理解C++中的new/delete和malloc/free动态内存管理
https://www.cnblogs.com/tencent-cloud-native/p/13591039.html�如何扩展单个Prometheus实现近万Kubernetes集群监控?
https://www.cnblogs.com/lm970585581/p/13590761.html�什么是消息中间件?主要作用是什么?
https://www.cnblogs.com/nlskyfree/p/13590598.html�Kafka Broker源码解析一:网络层设计
https://www.cnblogs.com/jackson0714/p/thread_safe_collections.html�# 全网最细 | 21张图带你领略集合的线程不安全
https://www.cnblogs.com/csh24/p/13590338.html�Java中的静态代理和动态代理
https://www.cnblogs.com/didijishu/p/13590106.html�滴滴数据仓库指标体系建设实践
https://www.cnblogs.com/txxunmei/p/13592118.html�详细分析链表的数据结构的实现过程(Java 实现)
https://www.cnblogs.com/libolun/p/13592114.html�day41:MYSQL:select查询练习题
https://www.cnblogs.com/snidget/p/13592106.html�如何快速适应新工作?
https://www.cnblogs.com/javadss/p/13584419.html�《Java从入门到失业》第三章:基础语法及基本程序结构(3.7):运算符(自增自减、关系运算、逻辑运算、条件运算、位运算、赋值运算、类型转换)
https://www.cnblogs.com/AllenMaster/p/13589170.html�Azure Storage 系列(一)入门简介
https://www.cnblogs.com/python2/p/13592095.html�AlexNet实现cifar10数据集分类
https://www.cnblogs.com/pythonista/p/13592043.html�Python 到底是强类型语言,还是弱类型语言?
https://www.cnblogs.com/wyq178/p/13520745.html�那些jdk中坑你没商量的方法
https://www.cnblogs.com/xingrenguanxue/p/13591846.html�【Go语言入门系列】(七)如何使用Go的方法?
https://www.cnblogs.com/xiaoqi/p/code-naming-tool.html�来,我们一起打造一款代码命名工具
https://www.cnblogs.com/youzidexiatian/p/13540629.html�Docker 部署 redis教程,附带部分小建议,防止踩坑
https://www.cnblogs.com/vege/p/13591593.html�Cobalt strike与内网渗透
https://www.cnblogs.com/jason0529/p/13591150.html�Urule开源版系列4——Core包核心接口之规则解析过程
https://www.cnblogs.com/liutongqing/p/13591093.html�深入理解C++中的new/delete和malloc/free动态内存管理
https://www.cnblogs.com/tencent-cloud-native/p/13591039.html�如何扩展单个Prometheus实现近万Kubernetes集群监控?
https://www.cnblogs.com/lm970585581/p/13590761.html�什么是消息中间件?主要作用是什么?
https://www.cnblogs.com/nlskyfree/p/13590598.html�Kafka Broker源码解析一:网络层设计
https://www.cnblogs.com/jackson0714/p/thread_safe_collections.html�# 全网最细 | 21张图带你领略集合的线程不安全
https://www.cnblogs.com/csh24/p/13590338.html�Java中的静态代理和动态代理
https://www.cnblogs.com/didijishu/p/13590106.html�滴滴数据仓库指标体系建设实践

解析内容是一个比较耗时的过程,不适合放在crawler中同步处理。同样使用队列解耦。
现在线程都是拿一条数据,执行完就结束了。修改为可以不停的从队列中取数据。

# 10个线程全部用上;
import requests,  threading, time
from bs4 import BeautifulSoup
from queue import Queue
from concurrent.futures import ThreadPoolExecutor


BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"

headers = {
    'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
}

# 队列,以后替换成第三方队列;
# 多线程解决方案;
urls = Queue()  # url的   队列
outputs = Queue()  # 结果输出 队列
htmls = Queue()  # 响应数据 队列

event = threading.Event()

def gen_urls(start, end, step=1):
    for i in range(start, end + 1, step):
        url = '{}{}{}'.format(BASE_URL, NEWSPATH, i)  # format 凑网址
        urls.put(url)

def crawler():
    while not event.is_set():
        try:
            url = urls.get(True, 1)  #
            print(url)
            response = requests.request("GET", url, headers=headers)  # urls 需要页面url
            html = response.text

            soup = BeautifulSoup(html, 'lxml')
            news = soup.select('article section div a')
            print(news)
            # print(html)
            htmls.put(html)
        except:
            print('crawler error')


def parse():
    while not event.is_set():
        try:
            html = htmls.get(True, 1)  #
            soup = BeautifulSoup(html, 'lxml')
            news = soup.select('article section div a')

            for new in news:
                # href = new.attr.get('href',None)
                # if href:
                url = new.attrs.get('href')  # 文章url
                title = new.text
                outputs.put((url, title))
        except:
            print('parse error')

# 持久化函数;
def persist(path):
    with open(path, 'a+', encoding='utf-8') as f:
        while not event.is_set():
            try:
                text = '{}\x01{}\n'.format(*outputs.get(True, 1))
                # print(text)
                f.write(text)
                f.flush()
            except:
                print('persist error')

# 高级异步库 推荐使用
executer = ThreadPoolExecutor(10)

executer.submit(gen_urls,1,2)
for _ in range(4):
    executer.submit(crawler)
for _ in range(5):
    executer.submit(parse)

executer.submit(persist,'./new.log')

while not event.is_set():
    cmd = input('>>>')
    if cmd.strip() == 'quit':
        break
    print(threading.enumerate())
进阶(消息队列)

将队列换成第三方服务,本次采用较为常用RabbitMQ。
搭建RabbitMQ服务,此过程略去,参看相关课件。

1. 队列工作模式选择

以爬虫程序的htmls队列为例,这个队列有多个生产者(爬取函数)写入,有多个消费者(解析函数)读取。每一
个消息只需要一个消费者使用。所以,采用RabbitMQ的工作队列模式。

RabbitMQ生产者、消费者两端都可以创建交换机、队列。

队列中如何如何分发呢?
其实说到底都是路由,RabbitMQ的队列和工作队列,其实都是路由模式,只不过使用了缺省交换机。

队列是否断开删除
每一数据都要处理,不能因为某一端断开,然后队列就删除了,造成数据丢失

测试代码

# consumer.py 消费者代码
import pika

en = 'news'   # 交换机   # 常量放到前面去;
qns = ('urls','htmls','outputs')

params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
connection = pika.BlockingConnection(params)

channel = connection.channel()  # 建立rabbit协议的通道
channel.exchange_declare(
    exchange=en,  # 交换机
    exchange_type='direct'  # 路由
)
# fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
# direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
# topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)

# 生成一个新的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
q1 = channel.queue_declare(qns[0], exclusive=False)  # 消费者一般不能断开就删除,
q2 = channel.queue_declare(qns[1], exclusive=False)
q3 = channel.queue_declare(qns[2], exclusive=False)


# binding 告诉exchange将message发送该哪些queue
channel.queue_bind(exchange=en,queue=qns[0])  # routing_key默认为en
channel.queue_bind(exchange=en,queue=qns[1])
channel.queue_bind(exchange=en,queue=qns[2])
# channel.queue_bind(exchange=en,queue=name2,routing_key=topic[2])

def callback(channel,method,properties,body):
    print(body)

with connection:  # 多个消费者tag1/2
    tag1 = channel.basic_consume(qns[1],callback,True)  # 可以消费数据 ,没有值就阻塞住;
    tag2 = channel.basic_consume(qns[1],callback,True)  # 可以消费数据
    channel.start_consuming()   # forever 不停止

print('receive meessage ok')

route_key 默认为queue名称 en
# send.py 生产者代码



20条数据给qns[0]=urls, 但2个consumer(tag1/2) 在 htmls处排队 消费;所以数据还在;


20条数据
2. 重构消息队列类

上面的多线程代码,写的不好,大量使用了全局变量,只是为了说明问题;

rebbitmq:15672 中 删除所有队列 和 news ;
# producer.py 生产者代码
import pika, random

class Producer:
    def __init__(self, url, exchangename, queuename):
        params = pika.URLParameters(url)
        # params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(      # 创建交换机;
            exchange=exchangename,  # 交换机
            exchange_type='direct'  # 路由
        )
        self.exchangename = exchangename
        self.queuename = queuename

        self.channel.queue_declare(queuename, exclusive=False)
        self.channel.queue_bind(queuename, exchangename)

    def sendmsg(self,msg:str):
        self.channel.basic_publish(
            exchange=self.exchangename,
            routing_key=self.queuename,  # routing_key 不设置,就是用queue的名称;
            body=msg # 消息
        )

if __name__ == '__main__':

    p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
    for i in range(40):
        p1.sendmsg('data-{}'.format(i))

-----------------------------------------------------------------
# consumer.py 消费者代码
import pika, time

class Consumer:
    def __init__(self, url, exchangename, queuename):
        params = pika.URLParameters(url)
        # params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(      # 创建交换机;
            exchange=exchangename,  # 交换机
            exchange_type='direct'  # 路由
        )
        self.exchangename = exchangename
        self.queuename = queuename

        self.channel.queue_declare(queuename, exclusive=False)
        self.channel.queue_bind(queuename, exchangename)

    def recvmsg(self):
        # _, _, body = self.channel.basic_get('hello', True)
        # print(body)
        return self.channel.basic_get(self.queuename, True)[2]  # 三元组


if __name__ == '__main__':
    c1 = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
    for i in range(10):   # 拿5个
        msg = c1.recvmsg()
        if msg:
            print(msg)
        else:
            time.sleep(1)
            print('sleep 1')
--------------------------------------------------
C:\ProgramData\Miniconda3\envs\blog\python.exe C:/Users/dell/PycharmProjects/spiders/consumer.py
b'data-6'
b'data-7'
b'data-8'
b'data-9'
b'data-10'
b'data-11'
b'data-12'
b'data-13'
b'data-14'
b'data-15'

consumer和productor代码相似性很高;能不能整合一下呢?

# messagequeue.py   MessageQueue代码
import pika, random

class MessageQueue:
    def __init__(self, url, exchangename, queuename):
        params = pika.URLParameters(url)
        # params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(      # 创建交换机;
            exchange=exchangename,  # 交换机
            exchange_type='direct'  # 路由
        )
        self.exchangename = exchangename
        self.queuename = queuename

        self.channel.queue_declare(queuename, exclusive=False)
        self.channel.queue_bind(queuename, exchangename)


class Producer(MessageQueue):
    def sendmsg(self,msg:str):
        self.channel.basic_publish(
            exchange=self.exchangename,
            routing_key=self.queuename,  # routing_key 不设置,就是用queue的名称;
            body=msg # 消息
        )


class Consumer(MessageQueue):
    def recvmsg(self):
        return self.channel.basic_get(self.queuename, True)[2]  # 三元组
# ----------------------------------------没有完善;
if __name__ == '__main__':

    p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
    for i in range(40):
        p1.sendmsg('data-{}'.format(i))


总结:tag = channel.basic_consume_consume/ channel.start_consuming() ( forever 不停止)只有在所有的consumer都cancel 完成后,它才会停止;推荐使用channel.basic_get

抽取 基类


功能和部署分离爬虫方案


# 分散式爬虫方案
# spider.py
import requests,  threading, time, simplejson
from bs4 import BeautifulSoup
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from messagequeue import Producer,Consumer


BASE_URL = "https://www.cnblogs.com/"
NEWSPATH = "#p"
headers = {
    'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36"
}

# 队列,以后替换成第三方队列;
# urls = Queue()  # url的   队列
# outputs = Queue()  # 结果输出 队列
# htmls = Queue()  # 响应数据 队列

event = threading.Event()

def gen_urls(start, end, step=1):
    p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'urls')
    for i in range(start, end + 1, step):
        url = '{}{}{}'.format(BASE_URL, NEWSPATH, i)  # format 凑网址
        if url:
            p.sendmsg(url)
            print(url)
        else:
            print(url,type(url))

def crawler():
    try:
        c = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'urls') # 可以做常量:可以做成配置文件;
        p = Producer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'htmls')
        while not event.wait(1):
            #url = urls.get(True, 1)  #
            url = c.recvmsg() # 拿一个url 数据
            if url:
                # with requests.request("GET", url, headers=headers) as response: # urls 需要页面url
                response = requests.request("GET", url, headers=headers)  # urls 需要页面url
                text = response.text
                # htmls.put(text)
                p.sendmsg(text)
            else:
                pass
                # print('crawder sendmsg error') # URL=None 等一秒再拿;
    except Exception as e:
        print('e','1111')

def parse():
    try:
        c = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'htmls')
        p = Producer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'outputs')
        while not event.is_set():  #   event.is_set() 默认阻塞行为;
            #html = htmls.get(True, 1)  #
            html = c.recvmsg()
            if html:
                soup = BeautifulSoup(html, 'lxml')
                news = soup.select('article section div a')

                for new in news:
                    # href = new.attr.get('href',None)
                    # if href:
                    url = new.attrs.get('href')  # 文章url
                    title = new.text
                    # outputs.put((url, title))
                    # p.sendmsg("\x01".join((url, title)))
                    p.sendmsg(simplejson.dumps({'url':url,'title':title}))   # simpleJson 文本序列化

            else:
                event.wait(1)
    except Exception as e:
        print('e', '2222')

# 持久化函数;
def persist(path):
    try:
        c = Consumer('amqp://wayne:wayne@192.168.0.100:5672/test', 'news', 'outputs')
        with open(path, 'a+', encoding='utf-8') as f:
            while not event.is_set():  # 只要有数据就尽可能的快;直接通过
                try:
                    # text = '{}\x01{}\n'.format(*outputs.get(True, 1))
                    data = c.recvmsg()
                    if data:
                        data = simplejson.loads(data)
                        text = '{}\x01{}\n'.format(data['url'],data['title'])   # 队列为空不能split?
                        print(text)  # 打印数据
                        f.write(text)
                        f.flush()
                    else:
                        event.wait(1)  # 只要有数据就尽可能的快,只要没数据等一秒再玩;
                except Exception as e:
                    raise
    except Exception as e:
        print('e', '3333')

# 高级异步库 推荐使用
executer = ThreadPoolExecutor(10)

executer.submit(gen_urls, 1, 50)
for _ in range(4):
    executer.submit(crawler)
for _ in range(5):
    executer.submit(parse)

executer.submit(persist, './new.log')

while not event.is_set():
    cmd = input('>>>')
    if cmd.strip() == 'quit':
        break
    print(threading.enumerate())


# messagequeue.py
# producer.py 生产者代码
import pika, random

class MessageQueue:
    def __init__(self, url, exchangename, queuename):
        params = pika.URLParameters(url)
        # params = pika.URLParameters('amqp://wayne:wayne@192.168.0.100:5672/test')
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()
        self.channel.exchange_declare(      # 创建交换机;
            exchange=exchangename,  # 交换机
            exchange_type='direct'  # 路由
        )
        self.exchangename = exchangename
        self.queuename = queuename

        self.channel.queue_declare(queuename, exclusive=False)
        self.channel.queue_bind(queuename, exchangename)


class Producer(MessageQueue):
    def sendmsg(self,msg:str):
        self.channel.basic_publish(
            exchange=self.exchangename,
            routing_key=self.queuename,  # routing_key 不设置,就是用queue的名称;
            body=msg # 消息
        )


class Consumer(MessageQueue):
    def recvmsg(self):
        return self.channel.basic_get(self.queuename, True)[2]  # 三元组

if __name__ == '__main__':

    p1 = Producer('amqp://wayne:wayne@192.168.0.100:5672/test','news','htmls')
    for i in range(40):
        p1.sendmsg('data-{}'.format(i))

三个队列各玩各的

相关文章

  • 64.5-模拟登陆和多线程异步爬虫

    很多人都不知道自己想要什么,其实是没有勇气面对和足够的努力去争取自己想要的! 总结:requests的GET 方法...

  • Scrapy基础——Cookies和Session

    我在Python爬虫基础-模拟登陆曾经谈过Cookies和Session。那么如何我想使用Scrapy进行模拟登陆...

  • 模拟登陆存在问题

    学习Python爬虫(七)--Scrapy模拟登录的post模拟登陆后,自己写了模拟登陆知乎首页的代码。 测试后发现无效

  • Selenium+ PhantomJS+Requests 综合使

    关键字: Python 爬虫 PhantomJS MongoDB Webdriver 模拟登陆 Fiddler ...

  • Python爬虫基础-模拟登陆

    为什么我们要让爬虫模拟登陆呢? 有些内容只有登陆才能进行爬取,如知乎,不登录的主页只能看到注册和登陆 ; 你想爬取...

  • Python网络爬虫之模拟登陆

    为什么要模拟登陆? Python网络爬虫应用十分广泛,但是有些网页需要用户登陆后才能获取到信息,所以我们的爬虫需要...

  • Scrapy爬虫模拟登陆豆瓣

    首先还是创建爬虫,其命令如下: 模拟登陆豆瓣的关键点1、分析真实post地址,寻找formdata;2、模拟pos...

  • 爬虫实战--模拟登陆

    并不是所有网站信息都可直接访问,有相当一部分的数据是需要用户授权登陆后才可以拿到的,比如某某网站开通vip会员才能...

  • java模拟登陆

    java爬虫 模拟登陆CSDN 抓包得到post请求的5个参数: username:139******027@16...

  • 用于pixiv漫画下载的爬虫

    GitHub - mtclaw/pixivSpider: 根据p站号下载漫画的爬虫。 # 关于模拟登陆 本来是想自...

网友评论

      本文标题:64.5-模拟登陆和多线程异步爬虫

      本文链接:https://www.haomeiwen.com/subject/cazysktx.html